Class: Protocol::ZMTP::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/protocol/zmtp/connection.rb

Overview

Manages one ZMTP peer connection over any transport IO.

Delegates the security handshake to a Mechanism object (Null, Curve, etc.), then provides message send/receive and command send/receive on top of the framing codec.

Heartbeat timing is tracked but not driven — the caller (e.g. an engine) is responsible for periodically sending PINGs and checking expiry.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, socket_type:, identity: "", as_server: false, mechanism: nil, max_message_size: nil, **opts) ⇒ Connection

Returns a new instance of Connection.

Parameters:

  • io (#peek, #read_exactly, #write, #flush, #close)

    transport IO

  • socket_type (String)

    our socket type name (e.g. “REQ”)

  • identity (String) (defaults to: "")

    our identity

  • as_server (Boolean) (defaults to: false)

    whether we are the server side

  • mechanism (Mechanism::Null, Mechanism::Curve) (defaults to: nil)

    security mechanism

  • max_message_size (Integer, nil) (defaults to: nil)

    max frame size in bytes, nil = unlimited

  • opts (Hash{String => String})

    extra READY properties to advertise (e.g. “X-QoS” => “1”). Upper-layer extensions use this to inject their own negotiated properties without the codec needing to know about them.



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/protocol/zmtp/connection.rb', line 61

def initialize(io, socket_type:, identity: "", as_server: false,
               mechanism: nil, max_message_size: nil, **opts)
  @io               = io
  @socket_type      = socket_type
  @identity         = identity
  @as_server        = as_server
  @mechanism        = mechanism || Mechanism::Null.new
  @peer_socket_type = nil
  @peer_identity    = nil
  @peer_public_key  = nil
  @peer_properties  = nil
  @peer_major       = nil
  @peer_minor       = nil
  @metadata         = opts.empty? ? nil : opts.transform_keys(&:to_s)
  @mutex            = Mutex.new
  @max_message_size = max_message_size
  @last_received_at = nil

  # Reusable scratch buffer for frame headers. Array#pack(buffer:)
  # writes in place so the per-message 2-or-9 byte String allocation
  # in write_frames disappears on the hot send path.
  @header_buf = String.new(capacity: 9, encoding: Encoding::BINARY)
end

Instance Attribute Details

#ioObject (readonly)

Returns transport IO (#peek, #read_exactly, #write, #flush, #close).

Returns:

  • (Object)

    transport IO (#peek, #read_exactly, #write, #flush, #close)



44
45
46
# File 'lib/protocol/zmtp/connection.rb', line 44

def io
  @io
end

#last_received_atFloat? (readonly)

Returns monotonic timestamp of last received frame.

Returns:

  • (Float, nil)

    monotonic timestamp of last received frame



48
49
50
# File 'lib/protocol/zmtp/connection.rb', line 48

def last_received_at
  @last_received_at
end

#peer_identityString (readonly)

Returns peer’s identity (from READY handshake).

Returns:

  • (String)

    peer’s identity (from READY handshake)



20
21
22
# File 'lib/protocol/zmtp/connection.rb', line 20

def peer_identity
  @peer_identity
end

#peer_majorInteger? (readonly)

Returns peer ZMTP major version (from greeting).

Returns:

  • (Integer, nil)

    peer ZMTP major version (from greeting)



35
36
37
# File 'lib/protocol/zmtp/connection.rb', line 35

def peer_major
  @peer_major
end

#peer_minorInteger? (readonly)

Returns peer ZMTP minor version (from greeting); 0 for ZMTP 3.0 peers, 1 for ZMTP 3.1+.

Returns:

  • (Integer, nil)

    peer ZMTP minor version (from greeting); 0 for ZMTP 3.0 peers, 1 for ZMTP 3.1+



40
41
42
# File 'lib/protocol/zmtp/connection.rb', line 40

def peer_minor
  @peer_minor
end

#peer_propertiesHash{String => String}? (readonly)

Returns full peer READY property hash (set after a successful handshake; nil before). Upper layers extract their own X-* properties from here.

Returns:

  • (Hash{String => String}, nil)

    full peer READY property hash (set after a successful handshake; nil before). Upper layers extract their own X-* properties from here.



31
32
33
# File 'lib/protocol/zmtp/connection.rb', line 31

def peer_properties
  @peer_properties
end

#peer_public_keyObject? (readonly)

Returns peer’s CURVE long-term public key post-handshake (crypto::PublicKey when the mechanism is CURVE; nil for NULL).

Returns:

  • (Object, nil)

    peer’s CURVE long-term public key post-handshake (crypto::PublicKey when the mechanism is CURVE; nil for NULL).



25
26
27
# File 'lib/protocol/zmtp/connection.rb', line 25

def peer_public_key
  @peer_public_key
end

#peer_socket_typeString (readonly)

Returns peer’s socket type (from READY handshake).

Returns:

  • (String)

    peer’s socket type (from READY handshake)



16
17
18
# File 'lib/protocol/zmtp/connection.rb', line 16

def peer_socket_type
  @peer_socket_type
end

Instance Method Details

#closevoid

This method returns an undefined value.

Closes the connection.



310
311
312
313
314
# File 'lib/protocol/zmtp/connection.rb', line 310

def close
  @io.close
rescue IOError
  # already closed
end

#encrypted?Boolean

Returns true if the ZMTP mechanism encrypts at the frame level (e.g. CURVE, BLAKE3ZMQ).

Returns:

  • (Boolean)


195
196
197
# File 'lib/protocol/zmtp/connection.rb', line 195

def encrypted?
  @mechanism.encrypted?
end

#flushvoid

This method returns an undefined value.

Flushes the write buffer to the underlying IO.



203
204
205
206
207
# File 'lib/protocol/zmtp/connection.rb', line 203

def flush
  @mutex.synchronize do
    @io.flush
  end
end

#handshake!void

This method returns an undefined value.

Performs the full ZMTP handshake via the configured mechanism.

Raises:

  • (Error)

    on handshake failure



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/protocol/zmtp/connection.rb', line 90

def handshake!
  result = @mechanism.handshake! @io,
    as_server:   @as_server,
    socket_type: @socket_type,
    identity:    @identity,
    metadata:    @metadata

  @peer_socket_type = result[:peer_socket_type]
  @peer_identity    = result[:peer_identity]
  @peer_public_key  = result[:peer_public_key]
  @peer_properties  = result[:peer_properties]
  @peer_major       = result[:peer_major]
  @peer_minor       = result[:peer_minor]

  unless @peer_socket_type
    raise Error, "peer READY missing Socket-Type"
  end

  unless VALID_PEERS[@socket_type.to_sym]&.include?(@peer_socket_type.to_sym)
    raise Error,
          "incompatible socket types: #{@socket_type} cannot connect to #{@peer_socket_type}"
  end
end

#heartbeat_expired?(timeout) ⇒ Boolean

Returns true if no frame has been received within timeout seconds.

Parameters:

  • timeout (Numeric)

    seconds

Returns:

  • (Boolean)


301
302
303
304
# File 'lib/protocol/zmtp/connection.rb', line 301

def heartbeat_expired?(timeout)
  return false unless @last_received_at
  (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @last_received_at) > timeout
end

#peer_infoPeerInfo?

Returns a PeerInfo value bundling the peer’s CURVE public key and identity for use as a stable per-peer key (frozen, hash-usable). Nil before the handshake has completed.

Returns:



120
121
122
123
# File 'lib/protocol/zmtp/connection.rb', line 120

def peer_info
  return nil unless @peer_socket_type
  PeerInfo.new(public_key: @peer_public_key, identity: @peer_identity)
end

#read_frameCodec::Frame

Reads one frame from the wire. Handles PING/PONG automatically. When using an encrypted mechanism, all frames are decrypted transparently (supports both CURVE MESSAGE wrapping and inline encryption like BLAKE3ZMQ).

Returns:

Raises:

  • (EOFError)

    if connection is closed



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/protocol/zmtp/connection.rb', line 259

def read_frame
  loop do
    begin
      frame = Codec::Frame.read_from(@io, max_message_size: @max_message_size)
    rescue Error
      close
      raise
    end

    touch_heartbeat

    frame = @mechanism.decrypt(frame) if @mechanism.encrypted?

    if frame.command?
      cmd = Codec::Command.from_body(frame.body)
      case cmd.name
      when "PING"
        _, context = cmd.ping_ttl_and_context
        send_command(Codec::Command.pong(context: context))
        next
      when "PONG"
        next
      end
    end

    return frame
  end
end

#receive_messageArray<String>

Receives a multi-frame message. PING/PONG commands are handled automatically by #read_frame.

Returns:

  • (Array<String>)

    message frames

Raises:

  • (EOFError)

    if connection is closed



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/protocol/zmtp/connection.rb', line 215

def receive_message
  frames = []

  loop do
    frame = read_frame

    if frame.command?
      yield frame if block_given?
      next
    end

    frames << frame.body
    break unless frame.more?
  end

  frames
end

#send_command(command) ⇒ void

This method returns an undefined value.

Sends a command.

Parameters:



238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/protocol/zmtp/connection.rb', line 238

def send_command(command)
  with_deferred_cancel do
    @mutex.synchronize do
      if @mechanism.encrypted?
        @io.write(@mechanism.encrypt(command.to_body, command: true))
      else
        @io.write(command.to_frame.to_wire)
      end
      @io.flush
    end
  end
end

#send_message(parts) ⇒ void

This method returns an undefined value.

Sends a multi-frame message (write + flush).

Parameters:

  • parts (Array<String>)

    message frames



130
131
132
133
134
135
136
137
# File 'lib/protocol/zmtp/connection.rb', line 130

def send_message(parts)
  with_deferred_cancel do
    @mutex.synchronize do
      write_frames(parts)
      @io.flush
    end
  end
end

#touch_heartbeatvoid

This method returns an undefined value.

Records that a frame was received (for heartbeat expiry tracking).



292
293
294
# File 'lib/protocol/zmtp/connection.rb', line 292

def touch_heartbeat
  @last_received_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

#write_message(parts) ⇒ void

This method returns an undefined value.

Writes a multi-frame message to the buffer without flushing. Call #flush after batching writes.

Parameters:

  • parts (Array<String>)

    message frames



145
146
147
148
149
150
151
# File 'lib/protocol/zmtp/connection.rb', line 145

def write_message(parts)
  with_deferred_cancel do
    @mutex.synchronize do
      write_frames(parts)
    end
  end
end

#write_messages(messages) ⇒ void

This method returns an undefined value.

Writes a batch of multi-frame messages to the buffer under a single mutex acquisition. Used by work-stealing send pumps that dequeue up to N messages at once — avoids the N lock/unlock pairs per batch that a plain ‘batch.each { write_message }` would incur.

Parameters:

  • messages (Array<Array<String>>)

    each element is one multi-frame message



163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/protocol/zmtp/connection.rb', line 163

def write_messages(messages)
  with_deferred_cancel do
    @mutex.synchronize do
      i = 0
      n = messages.size
      while i < n
        write_frames(messages[i])
        i += 1
      end
    end
  end
end

#write_wire(wire_bytes) ⇒ void

This method returns an undefined value.

Writes pre-encoded wire bytes to the buffer without flushing. Used for fan-out: encode once, write to many connections.

Parameters:

  • wire_bytes (String)

    ZMTP wire-format bytes



182
183
184
185
186
187
188
# File 'lib/protocol/zmtp/connection.rb', line 182

def write_wire(wire_bytes)
  with_deferred_cancel do
    @mutex.synchronize do
      @io.write(wire_bytes)
    end
  end
end