Class: Protocol::ZMTP::Connection
- Inherits:
-
Object
- Object
- Protocol::ZMTP::Connection
- Defined in:
- lib/protocol/zmtp/connection.rb
Overview
Manages one ZMTP peer connection over any transport IO.
Delegates the security handshake to a Mechanism object (Null, Curve, etc.), then provides message send/receive and command send/receive on top of the framing codec.
Heartbeat timing is tracked but not driven — the caller (e.g. an engine) is responsible for periodically sending PINGs and checking expiry.
Instance Attribute Summary collapse
-
#io ⇒ Object
readonly
Transport IO (#read_exactly, #write, #flush, #close).
-
#last_received_at ⇒ Float?
readonly
Monotonic timestamp of last received frame.
-
#peer_identity ⇒ String
readonly
Peer’s identity (from READY handshake).
-
#peer_major ⇒ Integer?
readonly
Peer ZMTP major version (from greeting).
-
#peer_minor ⇒ Integer?
readonly
Peer ZMTP minor version (from greeting); 0 for ZMTP 3.0 peers, 1 for ZMTP 3.1+.
-
#peer_properties ⇒ Hash{String => String}?
readonly
Full peer READY property hash (set after a successful handshake; nil before).
-
#peer_qos ⇒ Integer
readonly
Peer’s QoS level (from READY handshake, 0 if absent).
-
#peer_qos_hash ⇒ String
readonly
Peer’s supported hash algorithms in preference order.
-
#peer_socket_type ⇒ String
readonly
Peer’s socket type (from READY handshake).
Instance Method Summary collapse
-
#close ⇒ void
Closes the connection.
-
#encrypted? ⇒ Boolean
Returns true if the ZMTP mechanism encrypts at the frame level (e.g. CURVE, BLAKE3ZMQ).
-
#flush ⇒ void
Flushes the write buffer to the underlying IO.
-
#handshake! ⇒ void
Performs the full ZMTP handshake via the configured mechanism.
-
#heartbeat_expired?(timeout) ⇒ Boolean
Returns true if no frame has been received within
timeoutseconds. -
#initialize(io, socket_type:, identity: "", as_server: false, mechanism: nil, max_message_size: nil, qos: 0, qos_hash: "") ⇒ Connection
constructor
A new instance of Connection.
-
#read_frame ⇒ Codec::Frame
Reads one frame from the wire.
-
#receive_message ⇒ Array<String>
Receives a multi-frame message.
-
#send_command(command) ⇒ void
Sends a command.
-
#send_message(parts) ⇒ void
Sends a multi-frame message (write + flush).
-
#touch_heartbeat ⇒ void
Records that a frame was received (for heartbeat expiry tracking).
-
#write_message(parts) ⇒ void
Writes a multi-frame message to the buffer without flushing.
-
#write_messages(messages) ⇒ void
Writes a batch of multi-frame messages to the buffer under a single mutex acquisition.
-
#write_wire(wire_bytes) ⇒ void
Writes pre-encoded wire bytes to the buffer without flushing.
Constructor Details
#initialize(io, socket_type:, identity: "", as_server: false, mechanism: nil, max_message_size: nil, qos: 0, qos_hash: "") ⇒ Connection
Returns a new instance of Connection.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/protocol/zmtp/connection.rb', line 50 def initialize(io, socket_type:, identity: "", as_server: false, mechanism: nil, max_message_size: nil, qos: 0, qos_hash: "") @io = io @socket_type = socket_type @identity = identity @as_server = as_server @mechanism = mechanism || Mechanism::Null.new @peer_socket_type = nil @peer_identity = nil @peer_qos = nil @peer_qos_hash = nil @peer_properties = nil @peer_major = nil @peer_minor = nil @qos = qos @qos_hash = qos_hash @mutex = Mutex.new @max_message_size = @last_received_at = nil # Reusable scratch buffer for frame headers. Array#pack(buffer:) # writes in place so the per-message 2-or-9 byte String allocation # in write_frames disappears on the hot send path. @header_buf = String.new(capacity: 9, encoding: Encoding::BINARY) end |
Instance Attribute Details
#io ⇒ Object (readonly)
Returns transport IO (#read_exactly, #write, #flush, #close).
39 40 41 |
# File 'lib/protocol/zmtp/connection.rb', line 39 def io @io end |
#last_received_at ⇒ Float? (readonly)
Returns monotonic timestamp of last received frame.
42 43 44 |
# File 'lib/protocol/zmtp/connection.rb', line 42 def last_received_at @last_received_at end |
#peer_identity ⇒ String (readonly)
Returns peer’s identity (from READY handshake).
19 20 21 |
# File 'lib/protocol/zmtp/connection.rb', line 19 def peer_identity @peer_identity end |
#peer_major ⇒ Integer? (readonly)
Returns peer ZMTP major version (from greeting).
32 33 34 |
# File 'lib/protocol/zmtp/connection.rb', line 32 def peer_major @peer_major end |
#peer_minor ⇒ Integer? (readonly)
Returns peer ZMTP minor version (from greeting); 0 for ZMTP 3.0 peers, 1 for ZMTP 3.1+.
36 37 38 |
# File 'lib/protocol/zmtp/connection.rb', line 36 def peer_minor @peer_minor end |
#peer_properties ⇒ Hash{String => String}? (readonly)
Returns full peer READY property hash (set after a successful handshake; nil before).
29 30 31 |
# File 'lib/protocol/zmtp/connection.rb', line 29 def peer_properties @peer_properties end |
#peer_qos ⇒ Integer (readonly)
Returns peer’s QoS level (from READY handshake, 0 if absent).
22 23 24 |
# File 'lib/protocol/zmtp/connection.rb', line 22 def peer_qos @peer_qos end |
#peer_qos_hash ⇒ String (readonly)
Returns peer’s supported hash algorithms in preference order.
25 26 27 |
# File 'lib/protocol/zmtp/connection.rb', line 25 def peer_qos_hash @peer_qos_hash end |
#peer_socket_type ⇒ String (readonly)
Returns peer’s socket type (from READY handshake).
16 17 18 |
# File 'lib/protocol/zmtp/connection.rb', line 16 def peer_socket_type @peer_socket_type end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Closes the connection.
288 289 290 291 292 |
# File 'lib/protocol/zmtp/connection.rb', line 288 def close @io.close rescue IOError # already closed end |
#encrypted? ⇒ Boolean
Returns true if the ZMTP mechanism encrypts at the frame level (e.g. CURVE, BLAKE3ZMQ).
179 180 181 |
# File 'lib/protocol/zmtp/connection.rb', line 179 def encrypted? @mechanism.encrypted? end |
#flush ⇒ void
This method returns an undefined value.
Flushes the write buffer to the underlying IO.
187 188 189 190 191 |
# File 'lib/protocol/zmtp/connection.rb', line 187 def flush @mutex.synchronize do @io.flush end end |
#handshake! ⇒ void
This method returns an undefined value.
Performs the full ZMTP handshake via the configured mechanism.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/protocol/zmtp/connection.rb', line 81 def handshake! result = @mechanism.handshake!( @io, as_server: @as_server, socket_type: @socket_type, identity: @identity, qos: @qos, qos_hash: @qos_hash, ) @peer_socket_type = result[:peer_socket_type] @peer_identity = result[:peer_identity] @peer_qos = result[:peer_qos] || 0 @peer_qos_hash = result[:peer_qos_hash] || "" @peer_properties = result[:peer_properties] @peer_major = result[:peer_major] @peer_minor = result[:peer_minor] unless @peer_socket_type raise Error, "peer READY missing Socket-Type" end unless VALID_PEERS[@socket_type.to_sym]&.include?(@peer_socket_type.to_sym) raise Error, "incompatible socket types: #{@socket_type} cannot connect to #{@peer_socket_type}" end end |
#heartbeat_expired?(timeout) ⇒ Boolean
Returns true if no frame has been received within timeout seconds.
279 280 281 282 |
# File 'lib/protocol/zmtp/connection.rb', line 279 def heartbeat_expired?(timeout) return false unless @last_received_at (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @last_received_at) > timeout end |
#read_frame ⇒ Codec::Frame
Reads one frame from the wire. Handles PING/PONG automatically. When using an encrypted mechanism, all frames are decrypted transparently (supports both CURVE MESSAGE wrapping and inline encryption like BLAKE3ZMQ).
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/protocol/zmtp/connection.rb', line 239 def read_frame loop do begin frame = Codec::Frame.read_from(@io, max_message_size: @max_message_size) rescue Error close raise end touch_heartbeat frame = @mechanism.decrypt(frame) if @mechanism.encrypted? if frame.command? cmd = Codec::Command.from_body(frame.body) case cmd.name when "PING" _, context = cmd.ping_ttl_and_context send_command(Codec::Command.pong(context: context)) next when "PONG" next end end return frame end end |
#receive_message ⇒ Array<String>
Receives a multi-frame message. PING/PONG commands are handled automatically by #read_frame.
199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/protocol/zmtp/connection.rb', line 199 def frames = [] loop do frame = read_frame if frame.command? yield frame if block_given? next end frames << frame.body.freeze break unless frame.more? end frames.freeze end |
#send_command(command) ⇒ void
This method returns an undefined value.
Sends a command.
218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/protocol/zmtp/connection.rb', line 218 def send_command(command) with_deferred_cancel do @mutex.synchronize do if @mechanism.encrypted? @io.write(@mechanism.encrypt(command.to_body, command: true)) else @io.write(command.to_frame.to_wire) end @io.flush end end end |
#send_message(parts) ⇒ void
This method returns an undefined value.
Sends a multi-frame message (write + flush).
114 115 116 117 118 119 120 121 |
# File 'lib/protocol/zmtp/connection.rb', line 114 def (parts) with_deferred_cancel do @mutex.synchronize do write_frames(parts) @io.flush end end end |
#touch_heartbeat ⇒ void
This method returns an undefined value.
Records that a frame was received (for heartbeat expiry tracking).
270 271 272 |
# File 'lib/protocol/zmtp/connection.rb', line 270 def touch_heartbeat @last_received_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) end |
#write_message(parts) ⇒ void
This method returns an undefined value.
Writes a multi-frame message to the buffer without flushing. Call #flush after batching writes.
129 130 131 132 133 134 135 |
# File 'lib/protocol/zmtp/connection.rb', line 129 def (parts) with_deferred_cancel do @mutex.synchronize do write_frames(parts) end end end |
#write_messages(messages) ⇒ void
This method returns an undefined value.
Writes a batch of multi-frame messages to the buffer under a single mutex acquisition. Used by work-stealing send pumps that dequeue up to N messages at once — avoids the N lock/unlock pairs per batch that a plain ‘batch.each { write_message }` would incur.
147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/protocol/zmtp/connection.rb', line 147 def () with_deferred_cancel do @mutex.synchronize do i = 0 n = .size while i < n write_frames([i]) i += 1 end end end end |
#write_wire(wire_bytes) ⇒ void
This method returns an undefined value.
Writes pre-encoded wire bytes to the buffer without flushing. Used for fan-out: encode once, write to many connections.
166 167 168 169 170 171 172 |
# File 'lib/protocol/zmtp/connection.rb', line 166 def write_wire(wire_bytes) with_deferred_cancel do @mutex.synchronize do @io.write(wire_bytes) end end end |