Class: Protocol::ZMTP::Connection
- Inherits:
-
Object
- Object
- Protocol::ZMTP::Connection
- Defined in:
- lib/protocol/zmtp/connection.rb
Overview
Manages one ZMTP peer connection over any transport IO.
Delegates the security handshake to a Mechanism object (Null, Curve, etc.), then provides message send/receive and command send/receive on top of the framing codec.
Heartbeat timing is tracked but not driven — the caller (e.g. an engine) is responsible for periodically sending PINGs and checking expiry.
Instance Attribute Summary collapse
-
#io ⇒ Object
readonly
Transport IO (#read_exactly, #write, #flush, #close).
-
#last_received_at ⇒ Float?
readonly
Monotonic timestamp of last received frame.
-
#peer_identity ⇒ String
readonly
Peer’s identity (from READY handshake).
-
#peer_major ⇒ Integer?
readonly
Peer ZMTP major version (from greeting).
-
#peer_minor ⇒ Integer?
readonly
Peer ZMTP minor version (from greeting); 0 for ZMTP 3.0 peers, 1 for ZMTP 3.1+.
-
#peer_properties ⇒ Hash{String => String}?
readonly
Full peer READY property hash (set after a successful handshake; nil before).
-
#peer_qos ⇒ Integer
readonly
Peer’s QoS level (from READY handshake, 0 if absent).
-
#peer_qos_hash ⇒ String
readonly
Peer’s supported hash algorithms in preference order.
-
#peer_socket_type ⇒ String
readonly
Peer’s socket type (from READY handshake).
Instance Method Summary collapse
-
#close ⇒ void
Closes the connection.
-
#encrypted? ⇒ Boolean
Returns true if the ZMTP mechanism encrypts at the frame level (e.g. CURVE, BLAKE3ZMQ).
-
#flush ⇒ void
Flushes the write buffer to the underlying IO.
-
#handshake! ⇒ void
Performs the full ZMTP handshake via the configured mechanism.
-
#heartbeat_expired?(timeout) ⇒ Boolean
Returns true if no frame has been received within
timeoutseconds. -
#initialize(io, socket_type:, identity: "", as_server: false, mechanism: nil, max_message_size: nil, qos: 0, qos_hash: "") ⇒ Connection
constructor
A new instance of Connection.
-
#read_frame ⇒ Codec::Frame
Reads one frame from the wire.
-
#receive_message ⇒ Array<String>
Receives a multi-frame message.
-
#send_command(command) ⇒ void
Sends a command.
-
#send_message(parts) ⇒ void
Sends a multi-frame message (write + flush).
-
#touch_heartbeat ⇒ void
Records that a frame was received (for heartbeat expiry tracking).
-
#write_message(parts) ⇒ void
Writes a multi-frame message to the buffer without flushing.
-
#write_messages(messages) ⇒ void
Writes a batch of multi-frame messages to the buffer under a single mutex acquisition.
-
#write_wire(wire_bytes) ⇒ void
Writes pre-encoded wire bytes to the buffer without flushing.
Constructor Details
#initialize(io, socket_type:, identity: "", as_server: false, mechanism: nil, max_message_size: nil, qos: 0, qos_hash: "") ⇒ Connection
Returns a new instance of Connection.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/protocol/zmtp/connection.rb', line 59 def initialize(io, socket_type:, identity: "", as_server: false, mechanism: nil, max_message_size: nil, qos: 0, qos_hash: "") @io = io @socket_type = socket_type @identity = identity @as_server = as_server @mechanism = mechanism || Mechanism::Null.new @peer_socket_type = nil @peer_identity = nil @peer_qos = nil @peer_qos_hash = nil @peer_properties = nil @peer_major = nil @peer_minor = nil @qos = qos @qos_hash = qos_hash @mutex = Mutex.new @max_message_size = @last_received_at = nil # Reusable scratch buffer for frame headers. Array#pack(buffer:) # writes in place so the per-message 2-or-9 byte String allocation # in write_frames disappears on the hot send path. @header_buf = String.new(capacity: 9, encoding: Encoding::BINARY) end |
Instance Attribute Details
#io ⇒ Object (readonly)
Returns transport IO (#read_exactly, #write, #flush, #close).
46 47 48 |
# File 'lib/protocol/zmtp/connection.rb', line 46 def io @io end |
#last_received_at ⇒ Float? (readonly)
Returns monotonic timestamp of last received frame.
50 51 52 |
# File 'lib/protocol/zmtp/connection.rb', line 50 def last_received_at @last_received_at end |
#peer_identity ⇒ String (readonly)
Returns peer’s identity (from READY handshake).
20 21 22 |
# File 'lib/protocol/zmtp/connection.rb', line 20 def peer_identity @peer_identity end |
#peer_major ⇒ Integer? (readonly)
Returns peer ZMTP major version (from greeting).
37 38 39 |
# File 'lib/protocol/zmtp/connection.rb', line 37 def peer_major @peer_major end |
#peer_minor ⇒ Integer? (readonly)
Returns peer ZMTP minor version (from greeting); 0 for ZMTP 3.0 peers, 1 for ZMTP 3.1+.
42 43 44 |
# File 'lib/protocol/zmtp/connection.rb', line 42 def peer_minor @peer_minor end |
#peer_properties ⇒ Hash{String => String}? (readonly)
Returns full peer READY property hash (set after a successful handshake; nil before).
33 34 35 |
# File 'lib/protocol/zmtp/connection.rb', line 33 def peer_properties @peer_properties end |
#peer_qos ⇒ Integer (readonly)
Returns peer’s QoS level (from READY handshake, 0 if absent).
24 25 26 |
# File 'lib/protocol/zmtp/connection.rb', line 24 def peer_qos @peer_qos end |
#peer_qos_hash ⇒ String (readonly)
Returns peer’s supported hash algorithms in preference order.
28 29 30 |
# File 'lib/protocol/zmtp/connection.rb', line 28 def peer_qos_hash @peer_qos_hash end |
#peer_socket_type ⇒ String (readonly)
Returns peer’s socket type (from READY handshake).
16 17 18 |
# File 'lib/protocol/zmtp/connection.rb', line 16 def peer_socket_type @peer_socket_type end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Closes the connection.
301 302 303 304 305 |
# File 'lib/protocol/zmtp/connection.rb', line 301 def close @io.close rescue IOError # already closed end |
#encrypted? ⇒ Boolean
Returns true if the ZMTP mechanism encrypts at the frame level (e.g. CURVE, BLAKE3ZMQ).
186 187 188 |
# File 'lib/protocol/zmtp/connection.rb', line 186 def encrypted? @mechanism.encrypted? end |
#flush ⇒ void
This method returns an undefined value.
Flushes the write buffer to the underlying IO.
194 195 196 197 198 |
# File 'lib/protocol/zmtp/connection.rb', line 194 def flush @mutex.synchronize do @io.flush end end |
#handshake! ⇒ void
This method returns an undefined value.
Performs the full ZMTP handshake via the configured mechanism.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/protocol/zmtp/connection.rb', line 90 def handshake! result = @mechanism.handshake! @io, as_server: @as_server, socket_type: @socket_type, identity: @identity, qos: @qos, qos_hash: @qos_hash @peer_socket_type = result[:peer_socket_type] @peer_identity = result[:peer_identity] @peer_qos = result[:peer_qos] || 0 @peer_qos_hash = result[:peer_qos_hash] || "" @peer_properties = result[:peer_properties] @peer_major = result[:peer_major] @peer_minor = result[:peer_minor] unless @peer_socket_type raise Error, "peer READY missing Socket-Type" end unless VALID_PEERS[@socket_type.to_sym]&.include?(@peer_socket_type.to_sym) raise Error, "incompatible socket types: #{@socket_type} cannot connect to #{@peer_socket_type}" end end |
#heartbeat_expired?(timeout) ⇒ Boolean
Returns true if no frame has been received within timeout seconds.
292 293 294 295 |
# File 'lib/protocol/zmtp/connection.rb', line 292 def heartbeat_expired?(timeout) return false unless @last_received_at (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @last_received_at) > timeout end |
#read_frame ⇒ Codec::Frame
Reads one frame from the wire. Handles PING/PONG automatically. When using an encrypted mechanism, all frames are decrypted transparently (supports both CURVE MESSAGE wrapping and inline encryption like BLAKE3ZMQ).
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
# File 'lib/protocol/zmtp/connection.rb', line 250 def read_frame loop do begin frame = Codec::Frame.read_from(@io, max_message_size: @max_message_size) rescue Error close raise end touch_heartbeat frame = @mechanism.decrypt(frame) if @mechanism.encrypted? if frame.command? cmd = Codec::Command.from_body(frame.body) case cmd.name when "PING" _, context = cmd.ping_ttl_and_context send_command(Codec::Command.pong(context: context)) next when "PONG" next end end return frame end end |
#receive_message ⇒ Array<String>
Receives a multi-frame message. PING/PONG commands are handled automatically by #read_frame.
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/protocol/zmtp/connection.rb', line 206 def frames = [] loop do frame = read_frame if frame.command? yield frame if block_given? next end frames << frame.body.freeze break unless frame.more? end frames.freeze end |
#send_command(command) ⇒ void
This method returns an undefined value.
Sends a command.
229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/protocol/zmtp/connection.rb', line 229 def send_command(command) with_deferred_cancel do @mutex.synchronize do if @mechanism.encrypted? @io.write(@mechanism.encrypt(command.to_body, command: true)) else @io.write(command.to_frame.to_wire) end @io.flush end end end |
#send_message(parts) ⇒ void
This method returns an undefined value.
Sends a multi-frame message (write + flush).
121 122 123 124 125 126 127 128 |
# File 'lib/protocol/zmtp/connection.rb', line 121 def (parts) with_deferred_cancel do @mutex.synchronize do write_frames(parts) @io.flush end end end |
#touch_heartbeat ⇒ void
This method returns an undefined value.
Records that a frame was received (for heartbeat expiry tracking).
283 284 285 |
# File 'lib/protocol/zmtp/connection.rb', line 283 def touch_heartbeat @last_received_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) end |
#write_message(parts) ⇒ void
This method returns an undefined value.
Writes a multi-frame message to the buffer without flushing. Call #flush after batching writes.
136 137 138 139 140 141 142 |
# File 'lib/protocol/zmtp/connection.rb', line 136 def (parts) with_deferred_cancel do @mutex.synchronize do write_frames(parts) end end end |
#write_messages(messages) ⇒ void
This method returns an undefined value.
Writes a batch of multi-frame messages to the buffer under a single mutex acquisition. Used by work-stealing send pumps that dequeue up to N messages at once — avoids the N lock/unlock pairs per batch that a plain ‘batch.each { write_message }` would incur.
154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/protocol/zmtp/connection.rb', line 154 def () with_deferred_cancel do @mutex.synchronize do i = 0 n = .size while i < n write_frames([i]) i += 1 end end end end |
#write_wire(wire_bytes) ⇒ void
This method returns an undefined value.
Writes pre-encoded wire bytes to the buffer without flushing. Used for fan-out: encode once, write to many connections.
173 174 175 176 177 178 179 |
# File 'lib/protocol/zmtp/connection.rb', line 173 def write_wire(wire_bytes) with_deferred_cancel do @mutex.synchronize do @io.write(wire_bytes) end end end |