Class: Protocol::ZMTP::Connection
- Inherits:
-
Object
- Object
- Protocol::ZMTP::Connection
- Defined in:
- lib/protocol/zmtp/connection.rb
Overview
Manages one ZMTP peer connection over any transport IO.
Delegates the security handshake to a Mechanism object (Null, Curve, etc.), then provides message send/receive and command send/receive on top of the framing codec.
Heartbeat timing is tracked but not driven — the caller (e.g. an engine) is responsible for periodically sending PINGs and checking expiry.
Instance Attribute Summary collapse
-
#io ⇒ Object
readonly
Transport IO (#read_exactly, #write, #flush, #close).
-
#last_received_at ⇒ Float?
readonly
Monotonic timestamp of last received frame.
-
#peer_identity ⇒ String
readonly
Peer’s identity (from READY handshake).
-
#peer_properties ⇒ Hash{String => String}?
readonly
Full peer READY property hash (set after a successful handshake; nil before).
-
#peer_qos ⇒ Integer
readonly
Peer’s QoS level (from READY handshake, 0 if absent).
-
#peer_qos_hash ⇒ String
readonly
Peer’s supported hash algorithms in preference order.
-
#peer_socket_type ⇒ String
readonly
Peer’s socket type (from READY handshake).
Instance Method Summary collapse
-
#close ⇒ void
Closes the connection.
-
#encrypted? ⇒ Boolean
Returns true if the ZMTP mechanism encrypts at the frame level (e.g. CURVE, BLAKE3ZMQ).
-
#flush ⇒ void
Flushes the write buffer to the underlying IO.
-
#handshake! ⇒ void
Performs the full ZMTP handshake via the configured mechanism.
-
#heartbeat_expired?(timeout) ⇒ Boolean
Returns true if no frame has been received within
timeoutseconds. -
#initialize(io, socket_type:, identity: "", as_server: false, mechanism: nil, max_message_size: nil, qos: 0, qos_hash: "") ⇒ Connection
constructor
A new instance of Connection.
-
#read_frame ⇒ Codec::Frame
Reads one frame from the wire.
-
#receive_message ⇒ Array<String>
Receives a multi-frame message.
-
#send_command(command) ⇒ void
Sends a command.
-
#send_message(parts) ⇒ void
Sends a multi-frame message (write + flush).
-
#touch_heartbeat ⇒ void
Records that a frame was received (for heartbeat expiry tracking).
-
#write_message(parts) ⇒ void
Writes a multi-frame message to the buffer without flushing.
-
#write_messages(messages) ⇒ void
Writes a batch of multi-frame messages to the buffer under a single mutex acquisition.
-
#write_wire(wire_bytes) ⇒ void
Writes pre-encoded wire bytes to the buffer without flushing.
Constructor Details
#initialize(io, socket_type:, identity: "", as_server: false, mechanism: nil, max_message_size: nil, qos: 0, qos_hash: "") ⇒ Connection
Returns a new instance of Connection.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/protocol/zmtp/connection.rb', line 43 def initialize(io, socket_type:, identity: "", as_server: false, mechanism: nil, max_message_size: nil, qos: 0, qos_hash: "") @io = io @socket_type = socket_type @identity = identity @as_server = as_server @mechanism = mechanism || Mechanism::Null.new @peer_socket_type = nil @peer_identity = nil @peer_qos = nil @peer_qos_hash = nil @peer_properties = nil @qos = qos @qos_hash = qos_hash @mutex = Mutex.new @max_message_size = @last_received_at = nil # Reusable scratch buffer for frame headers. Array#pack(buffer:) # writes in place so the per-message 2-or-9 byte String allocation # in write_frames disappears on the hot send path. @header_buf = String.new(capacity: 9, encoding: Encoding::BINARY) end |
Instance Attribute Details
#io ⇒ Object (readonly)
Returns transport IO (#read_exactly, #write, #flush, #close).
32 33 34 |
# File 'lib/protocol/zmtp/connection.rb', line 32 def io @io end |
#last_received_at ⇒ Float? (readonly)
Returns monotonic timestamp of last received frame.
35 36 37 |
# File 'lib/protocol/zmtp/connection.rb', line 35 def last_received_at @last_received_at end |
#peer_identity ⇒ String (readonly)
Returns peer’s identity (from READY handshake).
19 20 21 |
# File 'lib/protocol/zmtp/connection.rb', line 19 def peer_identity @peer_identity end |
#peer_properties ⇒ Hash{String => String}? (readonly)
Returns full peer READY property hash (set after a successful handshake; nil before).
29 30 31 |
# File 'lib/protocol/zmtp/connection.rb', line 29 def peer_properties @peer_properties end |
#peer_qos ⇒ Integer (readonly)
Returns peer’s QoS level (from READY handshake, 0 if absent).
22 23 24 |
# File 'lib/protocol/zmtp/connection.rb', line 22 def peer_qos @peer_qos end |
#peer_qos_hash ⇒ String (readonly)
Returns peer’s supported hash algorithms in preference order.
25 26 27 |
# File 'lib/protocol/zmtp/connection.rb', line 25 def peer_qos_hash @peer_qos_hash end |
#peer_socket_type ⇒ String (readonly)
Returns peer’s socket type (from READY handshake).
16 17 18 |
# File 'lib/protocol/zmtp/connection.rb', line 16 def peer_socket_type @peer_socket_type end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Closes the connection.
277 278 279 280 281 |
# File 'lib/protocol/zmtp/connection.rb', line 277 def close @io.close rescue IOError # already closed end |
#encrypted? ⇒ Boolean
Returns true if the ZMTP mechanism encrypts at the frame level (e.g. CURVE, BLAKE3ZMQ).
168 169 170 |
# File 'lib/protocol/zmtp/connection.rb', line 168 def encrypted? @mechanism.encrypted? end |
#flush ⇒ void
This method returns an undefined value.
Flushes the write buffer to the underlying IO.
176 177 178 179 180 |
# File 'lib/protocol/zmtp/connection.rb', line 176 def flush @mutex.synchronize do @io.flush end end |
#handshake! ⇒ void
This method returns an undefined value.
Performs the full ZMTP handshake via the configured mechanism.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/protocol/zmtp/connection.rb', line 72 def handshake! result = @mechanism.handshake!( @io, as_server: @as_server, socket_type: @socket_type, identity: @identity, qos: @qos, qos_hash: @qos_hash, ) @peer_socket_type = result[:peer_socket_type] @peer_identity = result[:peer_identity] @peer_qos = result[:peer_qos] || 0 @peer_qos_hash = result[:peer_qos_hash] || "" @peer_properties = result[:peer_properties] unless @peer_socket_type raise Error, "peer READY missing Socket-Type" end unless VALID_PEERS[@socket_type.to_sym]&.include?(@peer_socket_type.to_sym) raise Error, "incompatible socket types: #{@socket_type} cannot connect to #{@peer_socket_type}" end end |
#heartbeat_expired?(timeout) ⇒ Boolean
Returns true if no frame has been received within timeout seconds.
268 269 270 271 |
# File 'lib/protocol/zmtp/connection.rb', line 268 def heartbeat_expired?(timeout) return false unless @last_received_at (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @last_received_at) > timeout end |
#read_frame ⇒ Codec::Frame
Reads one frame from the wire. Handles PING/PONG automatically. When using an encrypted mechanism, all frames are decrypted transparently (supports both CURVE MESSAGE wrapping and inline encryption like BLAKE3ZMQ).
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/protocol/zmtp/connection.rb', line 228 def read_frame loop do begin frame = Codec::Frame.read_from(@io, max_message_size: @max_message_size) rescue Error close raise end touch_heartbeat frame = @mechanism.decrypt(frame) if @mechanism.encrypted? if frame.command? cmd = Codec::Command.from_body(frame.body) case cmd.name when "PING" _, context = cmd.ping_ttl_and_context send_command(Codec::Command.pong(context: context)) next when "PONG" next end end return frame end end |
#receive_message ⇒ Array<String>
Receives a multi-frame message. PING/PONG commands are handled automatically by #read_frame.
188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/protocol/zmtp/connection.rb', line 188 def frames = [] loop do frame = read_frame if frame.command? yield frame if block_given? next end frames << frame.body.freeze break unless frame.more? end frames.freeze end |
#send_command(command) ⇒ void
This method returns an undefined value.
Sends a command.
207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/protocol/zmtp/connection.rb', line 207 def send_command(command) with_deferred_cancel do @mutex.synchronize do if @mechanism.encrypted? @io.write(@mechanism.encrypt(command.to_body, command: true)) else @io.write(command.to_frame.to_wire) end @io.flush end end end |
#send_message(parts) ⇒ void
This method returns an undefined value.
Sends a multi-frame message (write + flush).
103 104 105 106 107 108 109 110 |
# File 'lib/protocol/zmtp/connection.rb', line 103 def (parts) with_deferred_cancel do @mutex.synchronize do write_frames(parts) @io.flush end end end |
#touch_heartbeat ⇒ void
This method returns an undefined value.
Records that a frame was received (for heartbeat expiry tracking).
259 260 261 |
# File 'lib/protocol/zmtp/connection.rb', line 259 def touch_heartbeat @last_received_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) end |
#write_message(parts) ⇒ void
This method returns an undefined value.
Writes a multi-frame message to the buffer without flushing. Call #flush after batching writes.
118 119 120 121 122 123 124 |
# File 'lib/protocol/zmtp/connection.rb', line 118 def (parts) with_deferred_cancel do @mutex.synchronize do write_frames(parts) end end end |
#write_messages(messages) ⇒ void
This method returns an undefined value.
Writes a batch of multi-frame messages to the buffer under a single mutex acquisition. Used by work-stealing send pumps that dequeue up to N messages at once — avoids the N lock/unlock pairs per batch that a plain ‘batch.each { write_message }` would incur.
136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/protocol/zmtp/connection.rb', line 136 def () with_deferred_cancel do @mutex.synchronize do i = 0 n = .size while i < n write_frames([i]) i += 1 end end end end |
#write_wire(wire_bytes) ⇒ void
This method returns an undefined value.
Writes pre-encoded wire bytes to the buffer without flushing. Used for fan-out: encode once, write to many connections.
155 156 157 158 159 160 161 |
# File 'lib/protocol/zmtp/connection.rb', line 155 def write_wire(wire_bytes) with_deferred_cancel do @mutex.synchronize do @io.write(wire_bytes) end end end |