Class: Protocol::SP::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/protocol/sp/connection.rb

Overview

Manages one SP peer connection over any transport IO.

The SP wire protocol has no commands, no security mechanisms, and no multipart messages — ‘#handshake!` is just an exchange of two 8-byte greetings, and `#send_message` / `#receive_message` work on single binary bodies framed by an 8-byte big-endian length.

Constant Summary collapse

IPC_MSG_TYPE =

SP/IPC data messages are prefixed with a 1-byte message type. 0x01 = user message. 0x00 is reserved in nng for control frames (keepalive) that we don’t emit, but we still accept and skip on read for forward-compatibility with nng peers.

0x01

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, protocol:, max_message_size: nil, framing: :tcp) ⇒ Connection

Returns a new instance of Connection.

Parameters:

  • io (#read_exactly, #write, #flush, #close)

    transport IO

  • protocol (Integer)

    our protocol id (e.g. Protocols::PUSH_V0)

  • max_message_size (Integer, nil) (defaults to: nil)

    max body size, nil = unlimited

  • framing (Symbol) (defaults to: :tcp)

    :tcp (default) uses 8-byte length headers; :ipc prepends a 1-byte message-type marker to each frame (nng’s SP/IPC wire format)



42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/protocol/sp/connection.rb', line 42

def initialize(io, protocol:, max_message_size: nil, framing: :tcp)
  @io               = io
  @protocol         = protocol
  @peer_protocol    = nil
  @max_message_size = max_message_size
  @framing          = framing
  @mutex            = Mutex.new
  @last_received_at = nil

  # Reusable scratch buffer for frame headers — written into by
  # Array#pack(buffer:), then flushed to @io. Capacity 9 covers
  # both :tcp (8B) and :ipc (1+8B) framings.
  @header_buf = String.new(capacity: 9, encoding: Encoding::BINARY)
end

Instance Attribute Details

#framingSymbol (readonly)

Returns :tcp or :ipc.

Returns:

  • (Symbol)

    :tcp or :ipc



33
34
35
# File 'lib/protocol/sp/connection.rb', line 33

def framing
  @framing
end

#ioObject (readonly)

Returns transport IO (#read_exactly, #write, #flush, #close).

Returns:

  • (Object)

    transport IO (#read_exactly, #write, #flush, #close)



25
26
27
# File 'lib/protocol/sp/connection.rb', line 25

def io
  @io
end

#last_received_atFloat? (readonly)

Returns monotonic timestamp of last received frame.

Returns:

  • (Float, nil)

    monotonic timestamp of last received frame



29
30
31
# File 'lib/protocol/sp/connection.rb', line 29

def last_received_at
  @last_received_at
end

#peer_protocolInteger (readonly)

Returns peer’s protocol id (set after handshake).

Returns:

  • (Integer)

    peer’s protocol id (set after handshake)



21
22
23
# File 'lib/protocol/sp/connection.rb', line 21

def peer_protocol
  @peer_protocol
end

Instance Method Details

#closevoid

This method returns an undefined value.

Closes the connection.



235
236
237
238
239
# File 'lib/protocol/sp/connection.rb', line 235

def close
  @io.close
rescue IOError
  # already closed
end

#flushvoid

This method returns an undefined value.

Flushes the write buffer to the underlying IO.



171
172
173
174
175
# File 'lib/protocol/sp/connection.rb', line 171

def flush
  @mutex.synchronize do
    @io.flush
  end
end

#handshake!void

This method returns an undefined value.

Performs the SP/TCP greeting exchange.

Raises:

  • (Error)

    on greeting mismatch or peer-incompatibility



62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/protocol/sp/connection.rb', line 62

def handshake!
  @io.write(Codec::Greeting.encode(protocol: @protocol))
  @io.flush

  peer           = Codec::Greeting.decode(@io.read_exactly(Codec::Greeting::SIZE))
  @peer_protocol = peer
  valid          = Protocols::VALID_PEERS[@protocol]

  unless valid&.include?(peer)
    raise Error, "incompatible SP protocols: 0x#{@protocol.to_s(16)} cannot speak to 0x#{peer.to_s(16)}"
  end
end

#heartbeat_expired?(timeout) ⇒ Boolean

Returns true if no frame has been received within timeout seconds.

Parameters:

  • timeout (Numeric)

    seconds

Returns:

  • (Boolean)


225
226
227
228
229
# File 'lib/protocol/sp/connection.rb', line 225

def heartbeat_expired?(timeout)
  return false unless @last_received_at

  (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @last_received_at) > timeout
end

#receive_messageString

Receives one message body.

Returns:

  • (String)

    binary body (NOT frozen — let callers freeze if they want, the freeze cost shows up in hot loops)

Raises:

  • (EOFError)

    if connection is closed



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/protocol/sp/connection.rb', line 183

def receive_message
  if @framing == :ipc
    loop do
      # One read_exactly(9) is cheaper than separate 1+8 reads:
      # halves the io-stream dispatch overhead per message.
      header     = @io.read_exactly(9)
      type, size = header.unpack("CQ>")

      if @max_message_size && size > @max_message_size
        raise Error, "frame size #{size} exceeds max_message_size #{@max_message_size}"
      end

      body = size > 0 ? @io.read_exactly(size) : Codec::Frame::EMPTY_BODY
      touch_heartbeat

      # Skip nng IPC control frames (0x00 — keepalive/etc.); only
      # deliver user messages (0x01) to the caller.
      return body if type == IPC_MSG_TYPE
    end
  else
    frame = Codec::Frame.read_from(@io, max_message_size: @max_message_size)
    touch_heartbeat
    frame.body
  end
rescue Error
  close
  raise
end

#send_message(body) ⇒ void

This method returns an undefined value.

Sends one message (write + flush).

Parameters:

  • body (String)

    message body (single frame)



80
81
82
83
84
85
86
87
88
# File 'lib/protocol/sp/connection.rb', line 80

def send_message(body)
  with_deferred_cancel do
    @mutex.synchronize do
      write_header_nolock(body.bytesize)
      @io.write(body)
      @io.flush
    end
  end
end

#touch_heartbeatvoid

This method returns an undefined value.

Records that a frame was received (for inactivity tracking).



216
217
218
# File 'lib/protocol/sp/connection.rb', line 216

def touch_heartbeat
  @last_received_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

#write_message(body) ⇒ void

This method returns an undefined value.

Writes one message to the buffer without flushing. Call #flush after batching writes.

Two writes — header then body — into the buffered IO; avoids the per-message intermediate String allocation that Protocol::SP::Codec::Frame.encode would otherwise produce.

Parameters:

  • body (String)


100
101
102
103
104
105
106
107
# File 'lib/protocol/sp/connection.rb', line 100

def write_message(body)
  with_deferred_cancel do
    @mutex.synchronize do
      write_header_nolock(body.bytesize)
      @io.write(body)
    end
  end
end

#write_messages(bodies) ⇒ void

This method returns an undefined value.

Writes a batch of messages to the buffer under a single mutex acquisition. Used by work-stealing send pumps that dequeue up to N messages at once — avoids N lock/unlock pairs per batch. Call #flush after to push the buffer to the socket.

Parameters:

  • bodies (Array<String>)


117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/protocol/sp/connection.rb', line 117

def write_messages(bodies)
  with_deferred_cancel do
    @mutex.synchronize do
      i = 0
      n = bodies.size

      while i < n
        body = bodies[i]
        write_header_nolock(body.bytesize)
        @io.write(body)
        i += 1
      end
    end
  end
end

#write_wire(wire_bytes) ⇒ void

This method returns an undefined value.

Writes pre-encoded wire bytes without flushing. Used for fan-out: encode once with ‘Codec::Frame.encode`, write to many connections.

Parameters:

  • wire_bytes (String)


159
160
161
162
163
164
165
# File 'lib/protocol/sp/connection.rb', line 159

def write_wire(wire_bytes)
  with_deferred_cancel do
    @mutex.synchronize do
      @io.write(wire_bytes)
    end
  end
end