Class: Protocol::ZMTP::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/protocol/zmtp/connection.rb

Overview

Manages one ZMTP peer connection over any transport IO.

Delegates the security handshake to a Mechanism object (Null, Curve, etc.), then provides message send/receive and command send/receive on top of the framing codec.

Heartbeat timing is tracked but not driven — the caller (e.g. an engine) is responsible for periodically sending PINGs and checking expiry.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, socket_type:, identity: "", as_server: false, mechanism: nil, max_message_size: nil, qos: 0, qos_hash: "") ⇒ Connection

Returns a new instance of Connection.

Parameters:

  • io (#read_exactly, #write, #flush, #close)

    transport IO

  • socket_type (String)

    our socket type name (e.g. “REQ”)

  • identity (String) (defaults to: "")

    our identity

  • as_server (Boolean) (defaults to: false)

    whether we are the server side

  • mechanism (Mechanism::Null, Mechanism::Curve) (defaults to: nil)

    security mechanism

  • max_message_size (Integer, nil) (defaults to: nil)

    max frame size in bytes, nil = unlimited



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/protocol/zmtp/connection.rb', line 43

def initialize(io, socket_type:, identity: "", as_server: false,
               mechanism: nil, max_message_size: nil, qos: 0, qos_hash: "")
  @io               = io
  @socket_type      = socket_type
  @identity         = identity
  @as_server        = as_server
  @mechanism        = mechanism || Mechanism::Null.new
  @peer_socket_type = nil
  @peer_identity    = nil
  @peer_qos         = nil
  @peer_qos_hash    = nil
  @peer_properties  = nil
  @qos              = qos
  @qos_hash         = qos_hash
  @mutex            = Mutex.new
  @max_message_size = max_message_size
  @last_received_at = nil

  # Reusable scratch buffer for frame headers. Array#pack(buffer:)
  # writes in place so the per-message 2-or-9 byte String allocation
  # in write_frames disappears on the hot send path.
  @header_buf = String.new(capacity: 9, encoding: Encoding::BINARY)
end

Instance Attribute Details

#ioObject (readonly)

Returns transport IO (#read_exactly, #write, #flush, #close).

Returns:

  • (Object)

    transport IO (#read_exactly, #write, #flush, #close)



32
33
34
# File 'lib/protocol/zmtp/connection.rb', line 32

def io
  @io
end

#last_received_atFloat? (readonly)

Returns monotonic timestamp of last received frame.

Returns:

  • (Float, nil)

    monotonic timestamp of last received frame



35
36
37
# File 'lib/protocol/zmtp/connection.rb', line 35

def last_received_at
  @last_received_at
end

#peer_identityString (readonly)

Returns peer’s identity (from READY handshake).

Returns:

  • (String)

    peer’s identity (from READY handshake)



19
20
21
# File 'lib/protocol/zmtp/connection.rb', line 19

def peer_identity
  @peer_identity
end

#peer_propertiesHash{String => String}? (readonly)

Returns full peer READY property hash (set after a successful handshake; nil before).

Returns:

  • (Hash{String => String}, nil)

    full peer READY property hash (set after a successful handshake; nil before)



29
30
31
# File 'lib/protocol/zmtp/connection.rb', line 29

def peer_properties
  @peer_properties
end

#peer_qosInteger (readonly)

Returns peer’s QoS level (from READY handshake, 0 if absent).

Returns:

  • (Integer)

    peer’s QoS level (from READY handshake, 0 if absent)



22
23
24
# File 'lib/protocol/zmtp/connection.rb', line 22

def peer_qos
  @peer_qos
end

#peer_qos_hashString (readonly)

Returns peer’s supported hash algorithms in preference order.

Returns:

  • (String)

    peer’s supported hash algorithms in preference order



25
26
27
# File 'lib/protocol/zmtp/connection.rb', line 25

def peer_qos_hash
  @peer_qos_hash
end

#peer_socket_typeString (readonly)

Returns peer’s socket type (from READY handshake).

Returns:

  • (String)

    peer’s socket type (from READY handshake)



16
17
18
# File 'lib/protocol/zmtp/connection.rb', line 16

def peer_socket_type
  @peer_socket_type
end

Instance Method Details

#closevoid

This method returns an undefined value.

Closes the connection.



277
278
279
280
281
# File 'lib/protocol/zmtp/connection.rb', line 277

def close
  @io.close
rescue IOError
  # already closed
end

#encrypted?Boolean

Returns true if the ZMTP mechanism encrypts at the frame level (e.g. CURVE, BLAKE3ZMQ).

Returns:

  • (Boolean)


168
169
170
# File 'lib/protocol/zmtp/connection.rb', line 168

def encrypted?
  @mechanism.encrypted?
end

#flushvoid

This method returns an undefined value.

Flushes the write buffer to the underlying IO.



176
177
178
179
180
# File 'lib/protocol/zmtp/connection.rb', line 176

def flush
  @mutex.synchronize do
    @io.flush
  end
end

#handshake!void

This method returns an undefined value.

Performs the full ZMTP handshake via the configured mechanism.

Raises:

  • (Error)

    on handshake failure



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/protocol/zmtp/connection.rb', line 72

def handshake!
  result = @mechanism.handshake!(
    @io,
    as_server:   @as_server,
    socket_type: @socket_type,
    identity:    @identity,
    qos:         @qos,
    qos_hash:    @qos_hash,
  )

  @peer_socket_type = result[:peer_socket_type]
  @peer_identity    = result[:peer_identity]
  @peer_qos         = result[:peer_qos] || 0
  @peer_qos_hash    = result[:peer_qos_hash] || ""
  @peer_properties  = result[:peer_properties]

  unless @peer_socket_type
    raise Error, "peer READY missing Socket-Type"
  end

  unless VALID_PEERS[@socket_type.to_sym]&.include?(@peer_socket_type.to_sym)
    raise Error,
          "incompatible socket types: #{@socket_type} cannot connect to #{@peer_socket_type}"
  end
end

#heartbeat_expired?(timeout) ⇒ Boolean

Returns true if no frame has been received within timeout seconds.

Parameters:

  • timeout (Numeric)

    seconds

Returns:

  • (Boolean)


268
269
270
271
# File 'lib/protocol/zmtp/connection.rb', line 268

def heartbeat_expired?(timeout)
  return false unless @last_received_at
  (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @last_received_at) > timeout
end

#read_frameCodec::Frame

Reads one frame from the wire. Handles PING/PONG automatically. When using an encrypted mechanism, all frames are decrypted transparently (supports both CURVE MESSAGE wrapping and inline encryption like BLAKE3ZMQ).

Returns:

Raises:

  • (EOFError)

    if connection is closed



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/protocol/zmtp/connection.rb', line 228

def read_frame
  loop do
    begin
      frame = Codec::Frame.read_from(@io, max_message_size: @max_message_size)
    rescue Error
      close
      raise
    end
    touch_heartbeat

    frame = @mechanism.decrypt(frame) if @mechanism.encrypted?

    if frame.command?
      cmd = Codec::Command.from_body(frame.body)
      case cmd.name
      when "PING"
        _, context = cmd.ping_ttl_and_context
        send_command(Codec::Command.pong(context: context))
        next
      when "PONG"
        next
      end
    end
    return frame
  end
end

#receive_messageArray<String>

Receives a multi-frame message. PING/PONG commands are handled automatically by #read_frame.

Returns:

  • (Array<String>)

    message frames

Raises:

  • (EOFError)

    if connection is closed



188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/protocol/zmtp/connection.rb', line 188

def receive_message
  frames = []
  loop do
    frame = read_frame
    if frame.command?
      yield frame if block_given?
      next
    end
    frames << frame.body.freeze
    break unless frame.more?
  end
  frames.freeze
end

#send_command(command) ⇒ void

This method returns an undefined value.

Sends a command.

Parameters:



207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/protocol/zmtp/connection.rb', line 207

def send_command(command)
  with_deferred_cancel do
    @mutex.synchronize do
      if @mechanism.encrypted?
        @io.write(@mechanism.encrypt(command.to_body, command: true))
      else
        @io.write(command.to_frame.to_wire)
      end
      @io.flush
    end
  end
end

#send_message(parts) ⇒ void

This method returns an undefined value.

Sends a multi-frame message (write + flush).

Parameters:

  • parts (Array<String>)

    message frames



103
104
105
106
107
108
109
110
# File 'lib/protocol/zmtp/connection.rb', line 103

def send_message(parts)
  with_deferred_cancel do
    @mutex.synchronize do
      write_frames(parts)
      @io.flush
    end
  end
end

#touch_heartbeatvoid

This method returns an undefined value.

Records that a frame was received (for heartbeat expiry tracking).



259
260
261
# File 'lib/protocol/zmtp/connection.rb', line 259

def touch_heartbeat
  @last_received_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

#write_message(parts) ⇒ void

This method returns an undefined value.

Writes a multi-frame message to the buffer without flushing. Call #flush after batching writes.

Parameters:

  • parts (Array<String>)

    message frames



118
119
120
121
122
123
124
# File 'lib/protocol/zmtp/connection.rb', line 118

def write_message(parts)
  with_deferred_cancel do
    @mutex.synchronize do
      write_frames(parts)
    end
  end
end

#write_messages(messages) ⇒ void

This method returns an undefined value.

Writes a batch of multi-frame messages to the buffer under a single mutex acquisition. Used by work-stealing send pumps that dequeue up to N messages at once — avoids the N lock/unlock pairs per batch that a plain ‘batch.each { write_message }` would incur.

Parameters:

  • messages (Array<Array<String>>)

    each element is one multi-frame message



136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/protocol/zmtp/connection.rb', line 136

def write_messages(messages)
  with_deferred_cancel do
    @mutex.synchronize do
      i = 0
      n = messages.size
      while i < n
        write_frames(messages[i])
        i += 1
      end
    end
  end
end

#write_wire(wire_bytes) ⇒ void

This method returns an undefined value.

Writes pre-encoded wire bytes to the buffer without flushing. Used for fan-out: encode once, write to many connections.

Parameters:

  • wire_bytes (String)

    ZMTP wire-format bytes



155
156
157
158
159
160
161
# File 'lib/protocol/zmtp/connection.rb', line 155

def write_wire(wire_bytes)
  with_deferred_cancel do
    @mutex.synchronize do
      @io.write(wire_bytes)
    end
  end
end