Class: OMQ::RFC::Zstd::CompressionConnection
- Inherits:
-
SimpleDelegator
- Object
- SimpleDelegator
- OMQ::RFC::Zstd::CompressionConnection
- Includes:
- TransparentDelegator
- Defined in:
- lib/omq/rfc/zstd/connection.rb
Overview
Wraps a Protocol::ZMTP::Connection to transparently apply the ZMTP-Zstd sender/receiver rules (RFC sections 6.4 and 6.5) at the frame-body level.
Each outgoing message part is run through OMQ::RFC::Zstd::Codec.encode_part; each incoming part through OMQ::RFC::Zstd::Codec.decode_part. The wrapper is installed via Engine#connection_wrapper once the handshake has matched a profile – if the peer didn’t advertise a compatible X-Compression value, no wrapper is installed and the connection stays on the raw path.
Fan-out byte-sharing is disabled on wrapped connections by hiding #write_wire. Compression must run per-recipient under the current API (each connection could in principle use a different profile or dictionary), so the fan-out optimization in OMQ::Routing::FanOut falls back to per-connection write_message.
Defined Under Namespace
Modules: TransparentDelegator
Instance Attribute Summary collapse
-
#last_wire_size_in ⇒ Object
readonly
Compressed byte size of the most recent incoming message body.
-
#last_wire_size_out ⇒ Object
readonly
Compressed byte size of the most recent outgoing message body (sum over parts).
Instance Method Summary collapse
-
#initialize(conn, send_compression:, recv_compression:, max_message_size: nil, engine: nil) ⇒ CompressionConnection
constructor
A new instance of CompressionConnection.
-
#receive_message ⇒ Object
Reads one message via the underlying connection, intercepts ZMTP-Zstd command frames (e.g. ZDICT) via the frame block, and returns the decoded plaintext parts.
-
#respond_to?(name, include_private = false) ⇒ Boolean
Hides
#write_wirefrom callers soOMQ::Routing::FanOutfalls back to per-connectionwrite_message– see class docs for the rationale. -
#send_initial_dict! ⇒ Object
Sends the DICT command frame if the send-side compression has an inline dictionary to ship.
-
#send_message(parts) ⇒ Object
Compresses
partsand forwards the encoded payload through the underlying connection’s synchronous send path (acquires the write mutex and flushes). -
#write_message(parts) ⇒ Object
Compresses
partsand forwards to the underlying#write_message(writes to the buffer without flushing – the send pump batches and flushes at the end of the cycle). -
#write_messages(messages) ⇒ Object
Compresses each message in
messagesand forwards the batch to the underlying#write_messages.
Methods included from TransparentDelegator
Constructor Details
#initialize(conn, send_compression:, recv_compression:, max_message_size: nil, engine: nil) ⇒ CompressionConnection
Returns a new instance of CompressionConnection.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/omq/rfc/zstd/connection.rb', line 46 def initialize(conn, send_compression:, recv_compression:, max_message_size: nil, engine: nil) super(conn) @send_compression = send_compression @recv_compression = recv_compression @max_message_size = @engine = engine @dict_sent = false @last_wire_size_out = nil @last_wire_size_in = nil # Cached once: is the send side an auto-training compression # that still needs samples? Flipped false the moment training # completes, so #encode_parts drops the per-message branch. @auto_sampling = send_compression.is_a?(Compression) && send_compression.auto? end |
Instance Attribute Details
#last_wire_size_in ⇒ Object (readonly)
Compressed byte size of the most recent incoming message body.
70 71 72 |
# File 'lib/omq/rfc/zstd/connection.rb', line 70 def last_wire_size_in @last_wire_size_in end |
#last_wire_size_out ⇒ Object (readonly)
Compressed byte size of the most recent outgoing message body (sum over parts). Read by the engine’s verbose monitor to annotate :message_sent traces with wire=NB.
66 67 68 |
# File 'lib/omq/rfc/zstd/connection.rb', line 66 def last_wire_size_out @last_wire_size_out end |
Instance Method Details
#receive_message ⇒ Object
Reads one message via the underlying connection, intercepts ZMTP-Zstd command frames (e.g. ZDICT) via the frame block, and returns the decoded plaintext parts.
115 116 117 118 119 120 |
# File 'lib/omq/rfc/zstd/connection.rb', line 115 def parts = super do |frame| handle_command_frame(frame) end decode_parts(parts) end |
#respond_to?(name, include_private = false) ⇒ Boolean
Hides #write_wire from callers so OMQ::Routing::FanOut falls back to per-connection write_message – see class docs for the rationale.
76 77 78 79 |
# File 'lib/omq/rfc/zstd/connection.rb', line 76 def respond_to?(name, include_private = false) return false if name == :write_wire super end |
#send_initial_dict! ⇒ Object
Sends the DICT command frame if the send-side compression has an inline dictionary to ship. Called by EngineExt right after the wrapper is constructed, before the recv pump starts.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/omq/rfc/zstd/connection.rb', line 126 def send_initial_dict! return if @dict_sent return unless @send_compression # RFC Sec. 6.4: a passive sender MUST NOT emit a ZDICT frame. return if @send_compression.respond_to?(:passive?) && @send_compression.passive? bytes = @send_compression.send_dict_bytes return unless bytes if bytes.bytesize > DICT_FRAME_MAX_SIZE raise Error, "ZMTP-Zstd: dictionary exceeds DICT_FRAME_MAX_SIZE (#{bytes.bytesize} > #{DICT_FRAME_MAX_SIZE})" end __getobj__.send_command(Protocol::ZMTP::Codec::Command.new("ZDICT", bytes)) @dict_sent = true @engine&.emit_verbose_monitor_event(:zdict_sent, size: bytes.bytesize) end |
#send_message(parts) ⇒ Object
Compresses parts and forwards the encoded payload through the underlying connection’s synchronous send path (acquires the write mutex and flushes).
85 86 87 88 89 |
# File 'lib/omq/rfc/zstd/connection.rb', line 85 def (parts) encoded = encode_parts(parts) ship_auto_dict_if_ready super(encoded) end |
#write_message(parts) ⇒ Object
Compresses parts and forwards to the underlying #write_message (writes to the buffer without flushing – the send pump batches and flushes at the end of the cycle).
95 96 97 98 99 |
# File 'lib/omq/rfc/zstd/connection.rb', line 95 def (parts) encoded = encode_parts(parts) ship_auto_dict_if_ready super(encoded) end |
#write_messages(messages) ⇒ Object
Compresses each message in messages and forwards the batch to the underlying #write_messages. Used by the send pump’s batched-flush path.
105 106 107 108 109 |
# File 'lib/omq/rfc/zstd/connection.rb', line 105 def () encoded = .map { |parts| encode_parts(parts) } ship_auto_dict_if_ready super(encoded) end |