Class: Protocol::ZMTP::Connection
- Inherits:
-
Object
- Object
- Protocol::ZMTP::Connection
- Defined in:
- lib/protocol/zmtp/connection.rb
Overview
Manages one ZMTP peer connection over any transport IO.
Delegates the security handshake to a Mechanism object (Null, Curve, etc.), then provides message send/receive and command send/receive on top of the framing codec.
Heartbeat timing is tracked but not driven — the caller (e.g. an engine) is responsible for periodically sending PINGs and checking expiry.
Instance Attribute Summary collapse
-
#io ⇒ Object
readonly
Transport IO (#peek, #read_exactly, #write, #flush, #close).
-
#last_received_at ⇒ Float?
readonly
Monotonic timestamp of last received frame.
-
#peer_identity ⇒ String
readonly
Peer’s identity (from READY handshake).
-
#peer_major ⇒ Integer?
readonly
Peer ZMTP major version (from greeting).
-
#peer_minor ⇒ Integer?
readonly
Peer ZMTP minor version (from greeting); 0 for ZMTP 3.0 peers, 1 for ZMTP 3.1+.
-
#peer_properties ⇒ Hash{String => String}?
readonly
Full peer READY property hash (set after a successful handshake; nil before).
-
#peer_public_key ⇒ Object?
readonly
Peer’s CURVE long-term public key post-handshake (
crypto::PublicKeywhen the mechanism is CURVE;nilfor NULL). -
#peer_socket_type ⇒ String
readonly
Peer’s socket type (from READY handshake).
Instance Method Summary collapse
-
#close ⇒ void
Closes the connection.
-
#encrypted? ⇒ Boolean
Returns true if the ZMTP mechanism encrypts at the frame level (e.g. CURVE, BLAKE3ZMQ).
-
#flush ⇒ void
Flushes the write buffer to the underlying IO.
-
#handshake! ⇒ void
Performs the full ZMTP handshake via the configured mechanism.
-
#heartbeat_expired?(timeout) ⇒ Boolean
Returns true if no frame has been received within
timeoutseconds. -
#initialize(io, socket_type:, identity: "", as_server: false, mechanism: nil, max_message_size: nil, **opts) ⇒ Connection
constructor
A new instance of Connection.
-
#peer_info ⇒ PeerInfo?
Returns a PeerInfo value bundling the peer’s CURVE public key and identity for use as a stable per-peer key (frozen, hash-usable).
-
#read_frame ⇒ Codec::Frame
Reads one frame from the wire.
-
#receive_message ⇒ Array<String>
Receives a multi-frame message.
-
#send_command(command) ⇒ void
Sends a command.
-
#send_message(parts) ⇒ void
Sends a multi-frame message (write + flush).
-
#touch_heartbeat ⇒ void
Records that a frame was received (for heartbeat expiry tracking).
-
#write_message(parts) ⇒ void
Writes a multi-frame message to the buffer without flushing.
-
#write_messages(messages) ⇒ void
Writes a batch of multi-frame messages to the buffer under a single mutex acquisition.
-
#write_wire(wire_bytes) ⇒ void
Writes pre-encoded wire bytes to the buffer without flushing.
-
#write_wire_batch(wire_strings) ⇒ void
Writes multiple pre-encoded wire byte strings under a single mutex acquisition.
Constructor Details
#initialize(io, socket_type:, identity: "", as_server: false, mechanism: nil, max_message_size: nil, **opts) ⇒ Connection
Returns a new instance of Connection.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/protocol/zmtp/connection.rb', line 61 def initialize(io, socket_type:, identity: "", as_server: false, mechanism: nil, max_message_size: nil, **opts) @io = io @socket_type = socket_type @identity = identity @as_server = as_server @mechanism = mechanism || Mechanism::Null.new @peer_socket_type = nil @peer_identity = nil @peer_public_key = nil @peer_properties = nil @peer_major = nil @peer_minor = nil @metadata = opts.empty? ? nil : opts.transform_keys(&:to_s) @mutex = Mutex.new @max_message_size = @last_received_at = nil # Reusable scratch buffer for frame headers. Array#pack(buffer:) # writes in place so the per-message 2-or-9 byte String allocation # in write_frames disappears on the hot send path. @header_buf = String.new(capacity: 9, encoding: Encoding::BINARY) @frame_buf = String.new(capacity: 257, encoding: Encoding::BINARY) end |
Instance Attribute Details
#io ⇒ Object (readonly)
Returns transport IO (#peek, #read_exactly, #write, #flush, #close).
44 45 46 |
# File 'lib/protocol/zmtp/connection.rb', line 44 def io @io end |
#last_received_at ⇒ Float? (readonly)
Returns monotonic timestamp of last received frame.
48 49 50 |
# File 'lib/protocol/zmtp/connection.rb', line 48 def last_received_at @last_received_at end |
#peer_identity ⇒ String (readonly)
Returns peer’s identity (from READY handshake).
20 21 22 |
# File 'lib/protocol/zmtp/connection.rb', line 20 def peer_identity @peer_identity end |
#peer_major ⇒ Integer? (readonly)
Returns peer ZMTP major version (from greeting).
35 36 37 |
# File 'lib/protocol/zmtp/connection.rb', line 35 def peer_major @peer_major end |
#peer_minor ⇒ Integer? (readonly)
Returns peer ZMTP minor version (from greeting); 0 for ZMTP 3.0 peers, 1 for ZMTP 3.1+.
40 41 42 |
# File 'lib/protocol/zmtp/connection.rb', line 40 def peer_minor @peer_minor end |
#peer_properties ⇒ Hash{String => String}? (readonly)
Returns full peer READY property hash (set after a successful handshake; nil before). Upper layers extract their own X-* properties from here.
31 32 33 |
# File 'lib/protocol/zmtp/connection.rb', line 31 def peer_properties @peer_properties end |
#peer_public_key ⇒ Object? (readonly)
Returns peer’s CURVE long-term public key post-handshake (crypto::PublicKey when the mechanism is CURVE; nil for NULL).
25 26 27 |
# File 'lib/protocol/zmtp/connection.rb', line 25 def peer_public_key @peer_public_key end |
#peer_socket_type ⇒ String (readonly)
Returns peer’s socket type (from READY handshake).
16 17 18 |
# File 'lib/protocol/zmtp/connection.rb', line 16 def peer_socket_type @peer_socket_type end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Closes the connection.
330 331 332 333 334 |
# File 'lib/protocol/zmtp/connection.rb', line 330 def close @io.close rescue IOError # already closed end |
#encrypted? ⇒ Boolean
Returns true if the ZMTP mechanism encrypts at the frame level (e.g. CURVE, BLAKE3ZMQ).
215 216 217 |
# File 'lib/protocol/zmtp/connection.rb', line 215 def encrypted? @mechanism.encrypted? end |
#flush ⇒ void
This method returns an undefined value.
Flushes the write buffer to the underlying IO.
223 224 225 226 227 |
# File 'lib/protocol/zmtp/connection.rb', line 223 def flush @mutex.synchronize do @io.flush end end |
#handshake! ⇒ void
This method returns an undefined value.
Performs the full ZMTP handshake via the configured mechanism.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/protocol/zmtp/connection.rb', line 91 def handshake! result = @mechanism.handshake! @io, as_server: @as_server, socket_type: @socket_type, identity: @identity, metadata: @metadata @peer_socket_type = result[:peer_socket_type] @peer_identity = result[:peer_identity] @peer_public_key = result[:peer_public_key] @peer_properties = result[:peer_properties] @peer_major = result[:peer_major] @peer_minor = result[:peer_minor] unless @peer_socket_type raise Error, "peer READY missing Socket-Type" end unless VALID_PEERS[@socket_type.to_sym]&.include?(@peer_socket_type.to_sym) raise Error, "incompatible socket types: #{@socket_type} cannot connect to #{@peer_socket_type}" end end |
#heartbeat_expired?(timeout) ⇒ Boolean
Returns true if no frame has been received within timeout seconds.
321 322 323 324 |
# File 'lib/protocol/zmtp/connection.rb', line 321 def heartbeat_expired?(timeout) return false unless @last_received_at (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @last_received_at) > timeout end |
#peer_info ⇒ PeerInfo?
Returns a PeerInfo value bundling the peer’s CURVE public key and identity for use as a stable per-peer key (frozen, hash-usable). Nil before the handshake has completed.
121 122 123 124 |
# File 'lib/protocol/zmtp/connection.rb', line 121 def peer_info return nil unless @peer_socket_type PeerInfo.new(public_key: @peer_public_key, identity: @peer_identity) end |
#read_frame ⇒ Codec::Frame
Reads one frame from the wire. Handles PING/PONG automatically. When using an encrypted mechanism, all frames are decrypted transparently (supports both CURVE MESSAGE wrapping and inline encryption like BLAKE3ZMQ).
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/protocol/zmtp/connection.rb', line 279 def read_frame loop do begin frame = Codec::Frame.read_from(@io, max_message_size: @max_message_size) rescue Error close raise end touch_heartbeat frame = @mechanism.decrypt(frame) if @mechanism.encrypted? if frame.command? cmd = Codec::Command.from_body(frame.body) case cmd.name when "PING" _, context = cmd.ping_ttl_and_context send_command(Codec::Command.pong(context: context)) next when "PONG" next end end return frame end end |
#receive_message ⇒ Array<String>
Receives a multi-frame message. PING/PONG commands are handled automatically by #read_frame.
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/protocol/zmtp/connection.rb', line 235 def frames = [] loop do frame = read_frame if frame.command? yield frame if block_given? next end frames << frame.body break unless frame.more? end frames end |
#send_command(command) ⇒ void
This method returns an undefined value.
Sends a command.
258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/protocol/zmtp/connection.rb', line 258 def send_command(command) with_deferred_cancel do @mutex.synchronize do if @mechanism.encrypted? @io.write(@mechanism.encrypt(command.to_body, command: true)) else @io.write(command.to_frame.to_wire) end @io.flush end end end |
#send_message(parts) ⇒ void
This method returns an undefined value.
Sends a multi-frame message (write + flush).
131 132 133 134 135 136 137 138 |
# File 'lib/protocol/zmtp/connection.rb', line 131 def (parts) with_deferred_cancel do @mutex.synchronize do write_frames(parts) @io.flush end end end |
#touch_heartbeat ⇒ void
This method returns an undefined value.
Records that a frame was received (for heartbeat expiry tracking).
312 313 314 |
# File 'lib/protocol/zmtp/connection.rb', line 312 def touch_heartbeat @last_received_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) end |
#write_message(parts) ⇒ void
This method returns an undefined value.
Writes a multi-frame message to the buffer without flushing. Call #flush after batching writes.
146 147 148 149 150 151 152 |
# File 'lib/protocol/zmtp/connection.rb', line 146 def (parts) with_deferred_cancel do @mutex.synchronize do write_frames(parts) end end end |
#write_messages(messages) ⇒ void
This method returns an undefined value.
Writes a batch of multi-frame messages to the buffer under a single mutex acquisition. Used by work-stealing send pumps that dequeue up to N messages at once — avoids the N lock/unlock pairs per batch that a plain ‘batch.each { write_message }` would incur.
164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/protocol/zmtp/connection.rb', line 164 def () with_deferred_cancel do @mutex.synchronize do i = 0 n = .size while i < n write_frames([i]) i += 1 end end end end |
#write_wire(wire_bytes) ⇒ void
This method returns an undefined value.
Writes pre-encoded wire bytes to the buffer without flushing. Used for fan-out: encode once, write to many connections.
183 184 185 186 187 188 189 |
# File 'lib/protocol/zmtp/connection.rb', line 183 def write_wire(wire_bytes) with_deferred_cancel do @mutex.synchronize do @io.write(wire_bytes) end end end |
#write_wire_batch(wire_strings) ⇒ void
This method returns an undefined value.
Writes multiple pre-encoded wire byte strings under a single mutex acquisition.
197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/protocol/zmtp/connection.rb', line 197 def write_wire_batch(wire_strings) with_deferred_cancel do @mutex.synchronize do i = 0 n = wire_strings.size while i < n @io.write(wire_strings[i]) i += 1 end end end end |