Class: Protocol::ZMTP::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/protocol/zmtp/connection.rb

Overview

Manages one ZMTP peer connection over any transport IO.

Delegates the security handshake to a Mechanism object (Null, Curve, etc.), then provides message send/receive and command send/receive on top of the framing codec.

Heartbeat timing is tracked but not driven — the caller (e.g. an engine) is responsible for periodically sending PINGs and checking expiry.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, socket_type:, identity: "", as_server: false, mechanism: nil, max_message_size: nil, qos: 0, qos_hash: "") ⇒ Connection

Returns a new instance of Connection.

Parameters:

  • io (#read_exactly, #write, #flush, #close)

    transport IO

  • socket_type (String)

    our socket type name (e.g. “REQ”)

  • identity (String) (defaults to: "")

    our identity

  • as_server (Boolean) (defaults to: false)

    whether we are the server side

  • mechanism (Mechanism::Null, Mechanism::Curve) (defaults to: nil)

    security mechanism

  • max_message_size (Integer, nil) (defaults to: nil)

    max frame size in bytes, nil = unlimited



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/protocol/zmtp/connection.rb', line 50

def initialize(io, socket_type:, identity: "", as_server: false,
               mechanism: nil, max_message_size: nil, qos: 0, qos_hash: "")
  @io               = io
  @socket_type      = socket_type
  @identity         = identity
  @as_server        = as_server
  @mechanism        = mechanism || Mechanism::Null.new
  @peer_socket_type = nil
  @peer_identity    = nil
  @peer_qos         = nil
  @peer_qos_hash    = nil
  @peer_properties  = nil
  @peer_major       = nil
  @peer_minor       = nil
  @qos              = qos
  @qos_hash         = qos_hash
  @mutex            = Mutex.new
  @max_message_size = max_message_size
  @last_received_at = nil

  # Reusable scratch buffer for frame headers. Array#pack(buffer:)
  # writes in place so the per-message 2-or-9 byte String allocation
  # in write_frames disappears on the hot send path.
  @header_buf = String.new(capacity: 9, encoding: Encoding::BINARY)
end

Instance Attribute Details

#ioObject (readonly)

Returns transport IO (#read_exactly, #write, #flush, #close).

Returns:

  • (Object)

    transport IO (#read_exactly, #write, #flush, #close)



39
40
41
# File 'lib/protocol/zmtp/connection.rb', line 39

def io
  @io
end

#last_received_atFloat? (readonly)

Returns monotonic timestamp of last received frame.

Returns:

  • (Float, nil)

    monotonic timestamp of last received frame



42
43
44
# File 'lib/protocol/zmtp/connection.rb', line 42

def last_received_at
  @last_received_at
end

#peer_identityString (readonly)

Returns peer’s identity (from READY handshake).

Returns:

  • (String)

    peer’s identity (from READY handshake)



19
20
21
# File 'lib/protocol/zmtp/connection.rb', line 19

def peer_identity
  @peer_identity
end

#peer_majorInteger? (readonly)

Returns peer ZMTP major version (from greeting).

Returns:

  • (Integer, nil)

    peer ZMTP major version (from greeting)



32
33
34
# File 'lib/protocol/zmtp/connection.rb', line 32

def peer_major
  @peer_major
end

#peer_minorInteger? (readonly)

Returns peer ZMTP minor version (from greeting); 0 for ZMTP 3.0 peers, 1 for ZMTP 3.1+.

Returns:

  • (Integer, nil)

    peer ZMTP minor version (from greeting); 0 for ZMTP 3.0 peers, 1 for ZMTP 3.1+



36
37
38
# File 'lib/protocol/zmtp/connection.rb', line 36

def peer_minor
  @peer_minor
end

#peer_propertiesHash{String => String}? (readonly)

Returns full peer READY property hash (set after a successful handshake; nil before).

Returns:

  • (Hash{String => String}, nil)

    full peer READY property hash (set after a successful handshake; nil before)



29
30
31
# File 'lib/protocol/zmtp/connection.rb', line 29

def peer_properties
  @peer_properties
end

#peer_qosInteger (readonly)

Returns peer’s QoS level (from READY handshake, 0 if absent).

Returns:

  • (Integer)

    peer’s QoS level (from READY handshake, 0 if absent)



22
23
24
# File 'lib/protocol/zmtp/connection.rb', line 22

def peer_qos
  @peer_qos
end

#peer_qos_hashString (readonly)

Returns peer’s supported hash algorithms in preference order.

Returns:

  • (String)

    peer’s supported hash algorithms in preference order



25
26
27
# File 'lib/protocol/zmtp/connection.rb', line 25

def peer_qos_hash
  @peer_qos_hash
end

#peer_socket_typeString (readonly)

Returns peer’s socket type (from READY handshake).

Returns:

  • (String)

    peer’s socket type (from READY handshake)



16
17
18
# File 'lib/protocol/zmtp/connection.rb', line 16

def peer_socket_type
  @peer_socket_type
end

Instance Method Details

#closevoid

This method returns an undefined value.

Closes the connection.



288
289
290
291
292
# File 'lib/protocol/zmtp/connection.rb', line 288

def close
  @io.close
rescue IOError
  # already closed
end

#encrypted?Boolean

Returns true if the ZMTP mechanism encrypts at the frame level (e.g. CURVE, BLAKE3ZMQ).

Returns:

  • (Boolean)


179
180
181
# File 'lib/protocol/zmtp/connection.rb', line 179

def encrypted?
  @mechanism.encrypted?
end

#flushvoid

This method returns an undefined value.

Flushes the write buffer to the underlying IO.



187
188
189
190
191
# File 'lib/protocol/zmtp/connection.rb', line 187

def flush
  @mutex.synchronize do
    @io.flush
  end
end

#handshake!void

This method returns an undefined value.

Performs the full ZMTP handshake via the configured mechanism.

Raises:

  • (Error)

    on handshake failure



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/protocol/zmtp/connection.rb', line 81

def handshake!
  result = @mechanism.handshake!(
    @io,
    as_server:   @as_server,
    socket_type: @socket_type,
    identity:    @identity,
    qos:         @qos,
    qos_hash:    @qos_hash,
  )

  @peer_socket_type = result[:peer_socket_type]
  @peer_identity    = result[:peer_identity]
  @peer_qos         = result[:peer_qos] || 0
  @peer_qos_hash    = result[:peer_qos_hash] || ""
  @peer_properties  = result[:peer_properties]
  @peer_major       = result[:peer_major]
  @peer_minor       = result[:peer_minor]

  unless @peer_socket_type
    raise Error, "peer READY missing Socket-Type"
  end

  unless VALID_PEERS[@socket_type.to_sym]&.include?(@peer_socket_type.to_sym)
    raise Error,
          "incompatible socket types: #{@socket_type} cannot connect to #{@peer_socket_type}"
  end
end

#heartbeat_expired?(timeout) ⇒ Boolean

Returns true if no frame has been received within timeout seconds.

Parameters:

  • timeout (Numeric)

    seconds

Returns:

  • (Boolean)


279
280
281
282
# File 'lib/protocol/zmtp/connection.rb', line 279

def heartbeat_expired?(timeout)
  return false unless @last_received_at
  (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @last_received_at) > timeout
end

#read_frameCodec::Frame

Reads one frame from the wire. Handles PING/PONG automatically. When using an encrypted mechanism, all frames are decrypted transparently (supports both CURVE MESSAGE wrapping and inline encryption like BLAKE3ZMQ).

Returns:

Raises:

  • (EOFError)

    if connection is closed



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/protocol/zmtp/connection.rb', line 239

def read_frame
  loop do
    begin
      frame = Codec::Frame.read_from(@io, max_message_size: @max_message_size)
    rescue Error
      close
      raise
    end
    touch_heartbeat

    frame = @mechanism.decrypt(frame) if @mechanism.encrypted?

    if frame.command?
      cmd = Codec::Command.from_body(frame.body)
      case cmd.name
      when "PING"
        _, context = cmd.ping_ttl_and_context
        send_command(Codec::Command.pong(context: context))
        next
      when "PONG"
        next
      end
    end
    return frame
  end
end

#receive_messageArray<String>

Receives a multi-frame message. PING/PONG commands are handled automatically by #read_frame.

Returns:

  • (Array<String>)

    message frames

Raises:

  • (EOFError)

    if connection is closed



199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/protocol/zmtp/connection.rb', line 199

def receive_message
  frames = []
  loop do
    frame = read_frame
    if frame.command?
      yield frame if block_given?
      next
    end
    frames << frame.body.freeze
    break unless frame.more?
  end
  frames.freeze
end

#send_command(command) ⇒ void

This method returns an undefined value.

Sends a command.

Parameters:



218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/protocol/zmtp/connection.rb', line 218

def send_command(command)
  with_deferred_cancel do
    @mutex.synchronize do
      if @mechanism.encrypted?
        @io.write(@mechanism.encrypt(command.to_body, command: true))
      else
        @io.write(command.to_frame.to_wire)
      end
      @io.flush
    end
  end
end

#send_message(parts) ⇒ void

This method returns an undefined value.

Sends a multi-frame message (write + flush).

Parameters:

  • parts (Array<String>)

    message frames



114
115
116
117
118
119
120
121
# File 'lib/protocol/zmtp/connection.rb', line 114

def send_message(parts)
  with_deferred_cancel do
    @mutex.synchronize do
      write_frames(parts)
      @io.flush
    end
  end
end

#touch_heartbeatvoid

This method returns an undefined value.

Records that a frame was received (for heartbeat expiry tracking).



270
271
272
# File 'lib/protocol/zmtp/connection.rb', line 270

def touch_heartbeat
  @last_received_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

#write_message(parts) ⇒ void

This method returns an undefined value.

Writes a multi-frame message to the buffer without flushing. Call #flush after batching writes.

Parameters:

  • parts (Array<String>)

    message frames



129
130
131
132
133
134
135
# File 'lib/protocol/zmtp/connection.rb', line 129

def write_message(parts)
  with_deferred_cancel do
    @mutex.synchronize do
      write_frames(parts)
    end
  end
end

#write_messages(messages) ⇒ void

This method returns an undefined value.

Writes a batch of multi-frame messages to the buffer under a single mutex acquisition. Used by work-stealing send pumps that dequeue up to N messages at once — avoids the N lock/unlock pairs per batch that a plain ‘batch.each { write_message }` would incur.

Parameters:

  • messages (Array<Array<String>>)

    each element is one multi-frame message



147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/protocol/zmtp/connection.rb', line 147

def write_messages(messages)
  with_deferred_cancel do
    @mutex.synchronize do
      i = 0
      n = messages.size
      while i < n
        write_frames(messages[i])
        i += 1
      end
    end
  end
end

#write_wire(wire_bytes) ⇒ void

This method returns an undefined value.

Writes pre-encoded wire bytes to the buffer without flushing. Used for fan-out: encode once, write to many connections.

Parameters:

  • wire_bytes (String)

    ZMTP wire-format bytes



166
167
168
169
170
171
172
# File 'lib/protocol/zmtp/connection.rb', line 166

def write_wire(wire_bytes)
  with_deferred_cancel do
    @mutex.synchronize do
      @io.write(wire_bytes)
    end
  end
end