Class: OMQ::Compression::Zstd::Connection
- Inherits:
-
SimpleDelegator
- Object
- SimpleDelegator
- OMQ::Compression::Zstd::Connection
- Includes:
- TransparentDelegator
- Defined in:
- lib/omq/compression/zstd/connection.rb
Overview
Wraps a Protocol::ZMTP::Connection to transparently apply the ZMTP-Zstd sender/receiver rules (RFC sections 6.4 and 6.5) at the frame-body level.
Each outgoing message part is run through OMQ::Compression::Zstd::Codec.encode_part; each incoming part through OMQ::Compression::Zstd::Codec.decode_part. The wrapper is installed via Engine#connection_wrapper once the handshake has matched a profile – if the peer didn’t advertise a compatible X-Compression value, no wrapper is installed and the connection stays on the raw path.
Fan-out byte-sharing is disabled on wrapped connections by hiding #write_wire. Compression must run per-recipient under the current API (each connection could in principle use a different profile or dictionary), so the fan-out optimization in OMQ::Routing::FanOut falls back to per-connection write_message.
Defined Under Namespace
Modules: TransparentDelegator
Instance Attribute Summary collapse
-
#last_wire_size_in ⇒ Object
readonly
Compressed byte size of the most recent incoming message body.
-
#last_wire_size_out ⇒ Object
readonly
Compressed byte size of the most recent outgoing message body (sum over parts).
Instance Method Summary collapse
-
#initialize(conn, send_compression:, recv_compression:, max_message_size: nil, engine: nil) ⇒ Connection
constructor
A new instance of Connection.
-
#receive_message ⇒ Object
Reads one message via the underlying connection, intercepts ZMTP-Zstd command frames (e.g. ZDICT) via the frame block, and returns the decoded plaintext parts.
-
#respond_to?(name, include_private = false) ⇒ Boolean
Hides
#write_wirefrom callers soOMQ::Routing::FanOutfalls back to per-connectionwrite_message– see class docs for the rationale. -
#send_initial_dict! ⇒ Object
Sends the ZDICT command frame if the send-side compression has an inline dictionary to ship.
-
#send_message(parts) ⇒ Object
Compresses
partsand forwards the encoded payload through the underlying connection’s synchronous send path (acquires the write mutex and flushes). -
#write_message(parts) ⇒ Object
Compresses
partsand forwards to the underlying#write_message(writes to the buffer without flushing – the send pump batches and flushes at the end of the cycle). -
#write_messages(messages) ⇒ Object
Compresses each message in
messagesand forwards the batch to the underlying#write_messages.
Methods included from TransparentDelegator
Constructor Details
#initialize(conn, send_compression:, recv_compression:, max_message_size: nil, engine: nil) ⇒ Connection
Returns a new instance of Connection.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/omq/compression/zstd/connection.rb', line 51 def initialize(conn, send_compression:, recv_compression:, max_message_size: nil, engine: nil) super(conn) @send_compression = send_compression @recv_compression = recv_compression @max_message_size = @engine = engine @dict_sent = false @last_wire_size_out = nil @last_wire_size_in = nil # Cached once: is the send side an auto-training compression # that still needs samples? Flipped false the moment training # completes, so #encode_parts drops the per-message branch. @auto_sampling = send_compression.is_a?(Compressor) && send_compression.auto? end |
Instance Attribute Details
#last_wire_size_in ⇒ Object (readonly)
Compressed byte size of the most recent incoming message body.
75 76 77 |
# File 'lib/omq/compression/zstd/connection.rb', line 75 def last_wire_size_in @last_wire_size_in end |
#last_wire_size_out ⇒ Object (readonly)
Compressed byte size of the most recent outgoing message body (sum over parts). Read by the engine’s verbose monitor to annotate :message_sent traces with wire=NB.
71 72 73 |
# File 'lib/omq/compression/zstd/connection.rb', line 71 def last_wire_size_out @last_wire_size_out end |
Instance Method Details
#receive_message ⇒ Object
Reads one message via the underlying connection, intercepts ZMTP-Zstd command frames (e.g. ZDICT) via the frame block, and returns the decoded plaintext parts.
120 121 122 123 124 125 |
# File 'lib/omq/compression/zstd/connection.rb', line 120 def parts = super do |frame| handle_command_frame(frame) end decode_parts(parts) end |
#respond_to?(name, include_private = false) ⇒ Boolean
Hides #write_wire from callers so OMQ::Routing::FanOut falls back to per-connection write_message – see class docs for the rationale.
81 82 83 84 |
# File 'lib/omq/compression/zstd/connection.rb', line 81 def respond_to?(name, include_private = false) return false if name == :write_wire super end |
#send_initial_dict! ⇒ Object
Sends the ZDICT command frame if the send-side compression has an inline dictionary to ship. Called by EngineExt right after the wrapper is constructed, before the recv pump starts.
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/omq/compression/zstd/connection.rb', line 131 def send_initial_dict! return if @dict_sent return unless @send_compression # RFC Sec. 6.4: a passive sender MUST NOT emit a ZDICT frame. if @send_compression.respond_to?(:passive?) && @send_compression.passive? return end bytes = @send_compression.send_dict_bytes return unless bytes if bytes.bytesize > DICT_FRAME_MAX_SIZE raise Error, "ZMTP-Zstd: dictionary exceeds DICT_FRAME_MAX_SIZE (#{bytes.bytesize} > #{DICT_FRAME_MAX_SIZE})" end __getobj__.send_command(Protocol::ZMTP::Codec::Command.new("ZDICT", bytes)) @dict_sent = true @engine&.emit_verbose_monitor_event(:zdict_sent, size: bytes.bytesize) end |
#send_message(parts) ⇒ Object
Compresses parts and forwards the encoded payload through the underlying connection’s synchronous send path (acquires the write mutex and flushes).
90 91 92 93 94 |
# File 'lib/omq/compression/zstd/connection.rb', line 90 def (parts) encoded = encode_parts(parts) ship_auto_dict_if_ready super(encoded) end |
#write_message(parts) ⇒ Object
Compresses parts and forwards to the underlying #write_message (writes to the buffer without flushing – the send pump batches and flushes at the end of the cycle).
100 101 102 103 104 |
# File 'lib/omq/compression/zstd/connection.rb', line 100 def (parts) encoded = encode_parts(parts) ship_auto_dict_if_ready super(encoded) end |
#write_messages(messages) ⇒ Object
Compresses each message in messages and forwards the batch to the underlying #write_messages. Used by the send pump’s batched-flush path.
110 111 112 113 114 |
# File 'lib/omq/compression/zstd/connection.rb', line 110 def () encoded = .map { |parts| encode_parts(parts) } ship_auto_dict_if_ready super(encoded) end |