Class: OMQ::Compression::Zstd::Connection

Inherits:
SimpleDelegator
  • Object
show all
Includes:
TransparentDelegator
Defined in:
lib/omq/compression/zstd/connection.rb

Overview

Wraps a Protocol::ZMTP::Connection to transparently apply the ZMTP-Zstd sender/receiver rules (RFC sections 6.4 and 6.5) at the frame-body level.

Each outgoing message part is run through OMQ::Compression::Zstd::Codec.encode_part; each incoming part through OMQ::Compression::Zstd::Codec.decode_part. The wrapper is installed via Engine#connection_wrapper once the handshake has matched a profile – if the peer didn’t advertise a compatible X-Compression value, no wrapper is installed and the connection stays on the raw path.

Fan-out byte-sharing is disabled on wrapped connections by hiding #write_wire. Compression must run per-recipient under the current API (each connection could in principle use a different profile or dictionary), so the fan-out optimization in OMQ::Routing::FanOut falls back to per-connection write_message.

Defined Under Namespace

Modules: TransparentDelegator

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from TransparentDelegator

#is_a?

Constructor Details

#initialize(conn, send_compression:, recv_compression:, max_message_size: nil, engine: nil) ⇒ Connection

Returns a new instance of Connection.

Parameters:

  • conn (Protocol::ZMTP::Connection)

    underlying connection

  • send_compression (OMQ::Compression::Zstd::Compressor, nil)

    compression object used on outgoing parts; nil = no-op

  • recv_compression (OMQ::Compression::Zstd::Compressor, nil)

    compression object used on incoming parts; nil = no-op

  • max_message_size (Integer, nil) (defaults to: nil)


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/omq/compression/zstd/connection.rb', line 51

def initialize(conn, send_compression:, recv_compression:, max_message_size: nil, engine: nil)
  super(conn)
  @send_compression = send_compression
  @recv_compression = recv_compression
  @max_message_size = max_message_size
  @engine           = engine
  @dict_sent        = false
  @last_wire_size_out = nil
  @last_wire_size_in  = nil

  # Cached once: is the send side an auto-training compression
  # that still needs samples? Flipped false the moment training
  # completes, so #encode_parts drops the per-message branch.
  @auto_sampling = send_compression.is_a?(Compressor) && send_compression.auto?
end

Instance Attribute Details

#last_wire_size_inObject (readonly)

Compressed byte size of the most recent incoming message body.



75
76
77
# File 'lib/omq/compression/zstd/connection.rb', line 75

def last_wire_size_in
  @last_wire_size_in
end

#last_wire_size_outObject (readonly)

Compressed byte size of the most recent outgoing message body (sum over parts). Read by the engine’s verbose monitor to annotate :message_sent traces with wire=NB.



71
72
73
# File 'lib/omq/compression/zstd/connection.rb', line 71

def last_wire_size_out
  @last_wire_size_out
end

Instance Method Details

#receive_messageObject

Reads one message via the underlying connection, intercepts ZMTP-Zstd command frames (e.g. ZDICT) via the frame block, and returns the decoded plaintext parts.



120
121
122
123
124
125
# File 'lib/omq/compression/zstd/connection.rb', line 120

def receive_message
  parts = super do |frame|
    handle_command_frame(frame)
  end
  decode_parts(parts)
end

#respond_to?(name, include_private = false) ⇒ Boolean

Hides #write_wire from callers so OMQ::Routing::FanOut falls back to per-connection write_message – see class docs for the rationale.

Returns:

  • (Boolean)


81
82
83
84
# File 'lib/omq/compression/zstd/connection.rb', line 81

def respond_to?(name, include_private = false)
  return false if name == :write_wire
  super
end

#send_initial_dict!Object

Sends the ZDICT command frame if the send-side compression has an inline dictionary to ship. Called by EngineExt right after the wrapper is constructed, before the recv pump starts.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/omq/compression/zstd/connection.rb', line 131

def send_initial_dict!
  return if @dict_sent
  return unless @send_compression

  # RFC Sec. 6.4: a passive sender MUST NOT emit a ZDICT frame.
  if @send_compression.respond_to?(:passive?) && @send_compression.passive?
    return
  end

  bytes = @send_compression.send_dict_bytes
  return unless bytes

  if bytes.bytesize > DICT_FRAME_MAX_SIZE
    raise Error, "ZMTP-Zstd: dictionary exceeds DICT_FRAME_MAX_SIZE (#{bytes.bytesize} > #{DICT_FRAME_MAX_SIZE})"
  end

  __getobj__.send_command(Protocol::ZMTP::Codec::Command.new("ZDICT", bytes))

  @dict_sent = true
  @engine&.emit_verbose_monitor_event(:zdict_sent, size: bytes.bytesize)
end

#send_message(parts) ⇒ Object

Compresses parts and forwards the encoded payload through the underlying connection’s synchronous send path (acquires the write mutex and flushes).



90
91
92
93
94
# File 'lib/omq/compression/zstd/connection.rb', line 90

def send_message(parts)
  encoded = encode_parts(parts)
  ship_auto_dict_if_ready
  super(encoded)
end

#write_message(parts) ⇒ Object

Compresses parts and forwards to the underlying #write_message (writes to the buffer without flushing – the send pump batches and flushes at the end of the cycle).



100
101
102
103
104
# File 'lib/omq/compression/zstd/connection.rb', line 100

def write_message(parts)
  encoded = encode_parts(parts)
  ship_auto_dict_if_ready
  super(encoded)
end

#write_messages(messages) ⇒ Object

Compresses each message in messages and forwards the batch to the underlying #write_messages. Used by the send pump’s batched-flush path.



110
111
112
113
114
# File 'lib/omq/compression/zstd/connection.rb', line 110

def write_messages(messages)
  encoded = messages.map { |parts| encode_parts(parts) }
  ship_auto_dict_if_ready
  super(encoded)
end