Class: Protocol::ZMTP::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/protocol/zmtp/connection.rb

Overview

Manages one ZMTP peer connection over any transport IO.

Delegates the security handshake to a Mechanism object (Null, Curve, etc.), then provides message send/receive and command send/receive on top of the framing codec.

Heartbeat timing is tracked but not driven — the caller (e.g. an engine) is responsible for periodically sending PINGs and checking expiry.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, socket_type:, identity: "", as_server: false, mechanism: nil, max_message_size: nil, qos: 0, qos_hash: "") ⇒ Connection

Returns a new instance of Connection.

Parameters:

  • io (#read_exactly, #write, #flush, #close)

    transport IO

  • socket_type (String)

    our socket type name (e.g. “REQ”)

  • identity (String) (defaults to: "")

    our identity

  • as_server (Boolean) (defaults to: false)

    whether we are the server side

  • mechanism (Mechanism::Null, Mechanism::Curve) (defaults to: nil)

    security mechanism

  • max_message_size (Integer, nil) (defaults to: nil)

    max frame size in bytes, nil = unlimited



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/protocol/zmtp/connection.rb', line 59

def initialize(io, socket_type:, identity: "", as_server: false,
               mechanism: nil, max_message_size: nil, qos: 0, qos_hash: "")
  @io               = io
  @socket_type      = socket_type
  @identity         = identity
  @as_server        = as_server
  @mechanism        = mechanism || Mechanism::Null.new
  @peer_socket_type = nil
  @peer_identity    = nil
  @peer_qos         = nil
  @peer_qos_hash    = nil
  @peer_properties  = nil
  @peer_major       = nil
  @peer_minor       = nil
  @qos              = qos
  @qos_hash         = qos_hash
  @mutex            = Mutex.new
  @max_message_size = max_message_size
  @last_received_at = nil

  # Reusable scratch buffer for frame headers. Array#pack(buffer:)
  # writes in place so the per-message 2-or-9 byte String allocation
  # in write_frames disappears on the hot send path.
  @header_buf = String.new(capacity: 9, encoding: Encoding::BINARY)
end

Instance Attribute Details

#ioObject (readonly)

Returns transport IO (#read_exactly, #write, #flush, #close).

Returns:

  • (Object)

    transport IO (#read_exactly, #write, #flush, #close)



46
47
48
# File 'lib/protocol/zmtp/connection.rb', line 46

def io
  @io
end

#last_received_atFloat? (readonly)

Returns monotonic timestamp of last received frame.

Returns:

  • (Float, nil)

    monotonic timestamp of last received frame



50
51
52
# File 'lib/protocol/zmtp/connection.rb', line 50

def last_received_at
  @last_received_at
end

#peer_identityString (readonly)

Returns peer’s identity (from READY handshake).

Returns:

  • (String)

    peer’s identity (from READY handshake)



20
21
22
# File 'lib/protocol/zmtp/connection.rb', line 20

def peer_identity
  @peer_identity
end

#peer_majorInteger? (readonly)

Returns peer ZMTP major version (from greeting).

Returns:

  • (Integer, nil)

    peer ZMTP major version (from greeting)



37
38
39
# File 'lib/protocol/zmtp/connection.rb', line 37

def peer_major
  @peer_major
end

#peer_minorInteger? (readonly)

Returns peer ZMTP minor version (from greeting); 0 for ZMTP 3.0 peers, 1 for ZMTP 3.1+.

Returns:

  • (Integer, nil)

    peer ZMTP minor version (from greeting); 0 for ZMTP 3.0 peers, 1 for ZMTP 3.1+



42
43
44
# File 'lib/protocol/zmtp/connection.rb', line 42

def peer_minor
  @peer_minor
end

#peer_propertiesHash{String => String}? (readonly)

Returns full peer READY property hash (set after a successful handshake; nil before).

Returns:

  • (Hash{String => String}, nil)

    full peer READY property hash (set after a successful handshake; nil before)



33
34
35
# File 'lib/protocol/zmtp/connection.rb', line 33

def peer_properties
  @peer_properties
end

#peer_qosInteger (readonly)

Returns peer’s QoS level (from READY handshake, 0 if absent).

Returns:

  • (Integer)

    peer’s QoS level (from READY handshake, 0 if absent)



24
25
26
# File 'lib/protocol/zmtp/connection.rb', line 24

def peer_qos
  @peer_qos
end

#peer_qos_hashString (readonly)

Returns peer’s supported hash algorithms in preference order.

Returns:

  • (String)

    peer’s supported hash algorithms in preference order



28
29
30
# File 'lib/protocol/zmtp/connection.rb', line 28

def peer_qos_hash
  @peer_qos_hash
end

#peer_socket_typeString (readonly)

Returns peer’s socket type (from READY handshake).

Returns:

  • (String)

    peer’s socket type (from READY handshake)



16
17
18
# File 'lib/protocol/zmtp/connection.rb', line 16

def peer_socket_type
  @peer_socket_type
end

Instance Method Details

#closevoid

This method returns an undefined value.

Closes the connection.



301
302
303
304
305
# File 'lib/protocol/zmtp/connection.rb', line 301

def close
  @io.close
rescue IOError
  # already closed
end

#encrypted?Boolean

Returns true if the ZMTP mechanism encrypts at the frame level (e.g. CURVE, BLAKE3ZMQ).

Returns:

  • (Boolean)


186
187
188
# File 'lib/protocol/zmtp/connection.rb', line 186

def encrypted?
  @mechanism.encrypted?
end

#flushvoid

This method returns an undefined value.

Flushes the write buffer to the underlying IO.



194
195
196
197
198
# File 'lib/protocol/zmtp/connection.rb', line 194

def flush
  @mutex.synchronize do
    @io.flush
  end
end

#handshake!void

This method returns an undefined value.

Performs the full ZMTP handshake via the configured mechanism.

Raises:

  • (Error)

    on handshake failure



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/protocol/zmtp/connection.rb', line 90

def handshake!
  result = @mechanism.handshake! @io,
    as_server:   @as_server,
    socket_type: @socket_type,
    identity:    @identity,
    qos:         @qos,
    qos_hash:    @qos_hash

  @peer_socket_type = result[:peer_socket_type]
  @peer_identity    = result[:peer_identity]
  @peer_qos         = result[:peer_qos] || 0
  @peer_qos_hash    = result[:peer_qos_hash] || ""
  @peer_properties  = result[:peer_properties]
  @peer_major       = result[:peer_major]
  @peer_minor       = result[:peer_minor]

  unless @peer_socket_type
    raise Error, "peer READY missing Socket-Type"
  end

  unless VALID_PEERS[@socket_type.to_sym]&.include?(@peer_socket_type.to_sym)
    raise Error,
          "incompatible socket types: #{@socket_type} cannot connect to #{@peer_socket_type}"
  end
end

#heartbeat_expired?(timeout) ⇒ Boolean

Returns true if no frame has been received within timeout seconds.

Parameters:

  • timeout (Numeric)

    seconds

Returns:

  • (Boolean)


292
293
294
295
# File 'lib/protocol/zmtp/connection.rb', line 292

def heartbeat_expired?(timeout)
  return false unless @last_received_at
  (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @last_received_at) > timeout
end

#read_frameCodec::Frame

Reads one frame from the wire. Handles PING/PONG automatically. When using an encrypted mechanism, all frames are decrypted transparently (supports both CURVE MESSAGE wrapping and inline encryption like BLAKE3ZMQ).

Returns:

Raises:

  • (EOFError)

    if connection is closed



250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/protocol/zmtp/connection.rb', line 250

def read_frame
  loop do
    begin
      frame = Codec::Frame.read_from(@io, max_message_size: @max_message_size)
    rescue Error
      close
      raise
    end

    touch_heartbeat

    frame = @mechanism.decrypt(frame) if @mechanism.encrypted?

    if frame.command?
      cmd = Codec::Command.from_body(frame.body)
      case cmd.name
      when "PING"
        _, context = cmd.ping_ttl_and_context
        send_command(Codec::Command.pong(context: context))
        next
      when "PONG"
        next
      end
    end

    return frame
  end
end

#receive_messageArray<String>

Receives a multi-frame message. PING/PONG commands are handled automatically by #read_frame.

Returns:

  • (Array<String>)

    message frames

Raises:

  • (EOFError)

    if connection is closed



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/protocol/zmtp/connection.rb', line 206

def receive_message
  frames = []

  loop do
    frame = read_frame

    if frame.command?
      yield frame if block_given?
      next
    end

    frames << frame.body.freeze
    break unless frame.more?
  end

  frames.freeze
end

#send_command(command) ⇒ void

This method returns an undefined value.

Sends a command.

Parameters:



229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/protocol/zmtp/connection.rb', line 229

def send_command(command)
  with_deferred_cancel do
    @mutex.synchronize do
      if @mechanism.encrypted?
        @io.write(@mechanism.encrypt(command.to_body, command: true))
      else
        @io.write(command.to_frame.to_wire)
      end
      @io.flush
    end
  end
end

#send_message(parts) ⇒ void

This method returns an undefined value.

Sends a multi-frame message (write + flush).

Parameters:

  • parts (Array<String>)

    message frames



121
122
123
124
125
126
127
128
# File 'lib/protocol/zmtp/connection.rb', line 121

def send_message(parts)
  with_deferred_cancel do
    @mutex.synchronize do
      write_frames(parts)
      @io.flush
    end
  end
end

#touch_heartbeatvoid

This method returns an undefined value.

Records that a frame was received (for heartbeat expiry tracking).



283
284
285
# File 'lib/protocol/zmtp/connection.rb', line 283

def touch_heartbeat
  @last_received_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

#write_message(parts) ⇒ void

This method returns an undefined value.

Writes a multi-frame message to the buffer without flushing. Call #flush after batching writes.

Parameters:

  • parts (Array<String>)

    message frames



136
137
138
139
140
141
142
# File 'lib/protocol/zmtp/connection.rb', line 136

def write_message(parts)
  with_deferred_cancel do
    @mutex.synchronize do
      write_frames(parts)
    end
  end
end

#write_messages(messages) ⇒ void

This method returns an undefined value.

Writes a batch of multi-frame messages to the buffer under a single mutex acquisition. Used by work-stealing send pumps that dequeue up to N messages at once — avoids the N lock/unlock pairs per batch that a plain ‘batch.each { write_message }` would incur.

Parameters:

  • messages (Array<Array<String>>)

    each element is one multi-frame message



154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/protocol/zmtp/connection.rb', line 154

def write_messages(messages)
  with_deferred_cancel do
    @mutex.synchronize do
      i = 0
      n = messages.size
      while i < n
        write_frames(messages[i])
        i += 1
      end
    end
  end
end

#write_wire(wire_bytes) ⇒ void

This method returns an undefined value.

Writes pre-encoded wire bytes to the buffer without flushing. Used for fan-out: encode once, write to many connections.

Parameters:

  • wire_bytes (String)

    ZMTP wire-format bytes



173
174
175
176
177
178
179
# File 'lib/protocol/zmtp/connection.rb', line 173

def write_wire(wire_bytes)
  with_deferred_cancel do
    @mutex.synchronize do
      @io.write(wire_bytes)
    end
  end
end