Class: Protocol::SP::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/protocol/sp/connection.rb

Overview

Manages one SP peer connection over any transport IO.

The SP wire protocol has no commands, no security mechanisms, and no multipart messages — ‘#handshake!` is just an exchange of two 8-byte greetings, and `#send_message` / `#receive_message` work on single binary bodies framed by an 8-byte big-endian length.

Constant Summary collapse

IPC_MSG_TYPE =

SP/IPC data messages are prefixed with a 1-byte message type. 0x01 = user message. 0x00 is reserved in nng for control frames (keepalive) that we don’t emit, but we still accept and skip on read for forward-compatibility with nng peers.

0x01

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, protocol:, max_message_size: nil, framing: :tcp) ⇒ Connection

Returns a new instance of Connection.

Parameters:

  • io (#read_exactly, #write, #flush, #close)

    transport IO

  • protocol (Integer)

    our protocol id (e.g. Protocols::PUSH_V0)

  • max_message_size (Integer, nil) (defaults to: nil)

    max body size, nil = unlimited

  • framing (Symbol) (defaults to: :tcp)

    :tcp (default) uses 8-byte length headers; :ipc prepends a 1-byte message-type marker to each frame (nng’s SP/IPC wire format)



42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/protocol/sp/connection.rb', line 42

def initialize(io, protocol:, max_message_size: nil, framing: :tcp)
  @io               = io
  @protocol         = protocol
  @peer_protocol    = nil
  @max_message_size = max_message_size
  @framing          = framing
  @mutex            = Mutex.new
  @last_received_at = nil

  # Reusable scratch buffer for frame headers — written into by
  # Array#pack(buffer:), then flushed to @io. Capacity 9 covers
  # both :tcp (8B) and :ipc (1+8B) framings.
  @header_buf = String.new(capacity: 9, encoding: Encoding::BINARY)
end

Instance Attribute Details

#framingSymbol (readonly)

Returns :tcp or :ipc.

Returns:

  • (Symbol)

    :tcp or :ipc



33
34
35
# File 'lib/protocol/sp/connection.rb', line 33

def framing
  @framing
end

#ioObject (readonly)

Returns transport IO (#read_exactly, #write, #flush, #close).

Returns:

  • (Object)

    transport IO (#read_exactly, #write, #flush, #close)



25
26
27
# File 'lib/protocol/sp/connection.rb', line 25

def io
  @io
end

#last_received_atFloat? (readonly)

Returns monotonic timestamp of last received frame.

Returns:

  • (Float, nil)

    monotonic timestamp of last received frame



29
30
31
# File 'lib/protocol/sp/connection.rb', line 29

def last_received_at
  @last_received_at
end

#peer_protocolInteger (readonly)

Returns peer’s protocol id (set after handshake).

Returns:

  • (Integer)

    peer’s protocol id (set after handshake)



21
22
23
# File 'lib/protocol/sp/connection.rb', line 21

def peer_protocol
  @peer_protocol
end

Instance Method Details

#closevoid

This method returns an undefined value.

Closes the connection.



251
252
253
254
255
# File 'lib/protocol/sp/connection.rb', line 251

def close
  @io.close
rescue IOError
  # already closed
end

#flushvoid

This method returns an undefined value.

Flushes the write buffer to the underlying IO.



187
188
189
190
191
# File 'lib/protocol/sp/connection.rb', line 187

def flush
  @mutex.synchronize do
    @io.flush
  end
end

#handshake!void

This method returns an undefined value.

Performs the SP/TCP greeting exchange.

Raises:

  • (Error)

    on greeting mismatch or peer-incompatibility



62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/protocol/sp/connection.rb', line 62

def handshake!
  @io.write(Codec::Greeting.encode(protocol: @protocol))
  @io.flush

  peer           = Codec::Greeting.decode(@io.read_exactly(Codec::Greeting::SIZE))
  @peer_protocol = peer
  valid          = Protocols::VALID_PEERS[@protocol]

  unless valid&.include?(peer)
    raise Error, "incompatible SP protocols: 0x#{@protocol.to_s(16)} cannot speak to 0x#{peer.to_s(16)}"
  end
end

#heartbeat_expired?(timeout) ⇒ Boolean

Returns true if no frame has been received within timeout seconds.

Parameters:

  • timeout (Numeric)

    seconds

Returns:

  • (Boolean)


241
242
243
244
245
# File 'lib/protocol/sp/connection.rb', line 241

def heartbeat_expired?(timeout)
  return false unless @last_received_at

  (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @last_received_at) > timeout
end

#receive_messageString

Receives one message body.

Returns:

  • (String)

    binary body (NOT frozen — let callers freeze if they want, the freeze cost shows up in hot loops)

Raises:

  • (EOFError)

    if connection is closed



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/protocol/sp/connection.rb', line 199

def receive_message
  if @framing == :ipc
    loop do
      # One read_exactly(9) is cheaper than separate 1+8 reads:
      # halves the io-stream dispatch overhead per message.
      header     = @io.read_exactly(9)
      type, size = header.unpack("CQ>")

      if @max_message_size && size > @max_message_size
        raise Error, "frame size #{size} exceeds max_message_size #{@max_message_size}"
      end

      body = size > 0 ? @io.read_exactly(size) : Codec::Frame::EMPTY_BODY
      touch_heartbeat

      # Skip nng IPC control frames (0x00 — keepalive/etc.); only
      # deliver user messages (0x01) to the caller.
      return body if type == IPC_MSG_TYPE
    end
  else
    frame = Codec::Frame.read_from(@io, max_message_size: @max_message_size)
    touch_heartbeat
    frame.body
  end
rescue Error
  close
  raise
end

#send_message(body, header: nil) ⇒ void

This method returns an undefined value.

Sends one message (write + flush).

The optional header is a binary prefix written between the SP length prefix and body on the wire, framed as a single message of size ‘header.bytesize + body.bytesize`. It’s treated as an opaque slice so callers can pass a frozen shared buffer (e.g. a cached backtrace) without allocating a concatenated String. Receivers frame normally and get the header+body glued back together — this is purely a send-side allocation optimization.

Parameters:

  • body (String)

    message body (single frame)

  • header (String, nil) (defaults to: nil)

    optional binary prefix



89
90
91
92
93
94
95
96
97
98
99
# File 'lib/protocol/sp/connection.rb', line 89

def send_message(body, header: nil)
  with_deferred_cancel do
    @mutex.synchronize do
      total = body.bytesize + (header ? header.bytesize : 0)
      write_header_nolock(total)
      @io.write(header) if header
      @io.write(body)
      @io.flush
    end
  end
end

#touch_heartbeatvoid

This method returns an undefined value.

Records that a frame was received (for inactivity tracking).



232
233
234
# File 'lib/protocol/sp/connection.rb', line 232

def touch_heartbeat
  @last_received_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

#write_message(body, header: nil) ⇒ void

This method returns an undefined value.

Writes one message to the buffer without flushing. Call #flush after batching writes.

Two writes — header then body — into the buffered IO; avoids the per-message intermediate String allocation that Protocol::SP::Codec::Frame.encode would otherwise produce. When header is supplied, this becomes three buffered writes coalesced by ‘IO::Stream::Buffered` into a single `writev`.

Parameters:

  • body (String)
  • header (String, nil) (defaults to: nil)

    optional binary prefix



114
115
116
117
118
119
120
121
122
123
# File 'lib/protocol/sp/connection.rb', line 114

def write_message(body, header: nil)
  with_deferred_cancel do
    @mutex.synchronize do
      total = body.bytesize + (header ? header.bytesize : 0)
      write_header_nolock(total)
      @io.write(header) if header
      @io.write(body)
    end
  end
end

#write_messages(bodies) ⇒ void

This method returns an undefined value.

Writes a batch of messages to the buffer under a single mutex acquisition. Used by work-stealing send pumps that dequeue up to N messages at once — avoids N lock/unlock pairs per batch. Call #flush after to push the buffer to the socket.

Parameters:

  • bodies (Array<String>)


133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/protocol/sp/connection.rb', line 133

def write_messages(bodies)
  with_deferred_cancel do
    @mutex.synchronize do
      i = 0
      n = bodies.size

      while i < n
        body = bodies[i]
        write_header_nolock(body.bytesize)
        @io.write(body)
        i += 1
      end
    end
  end
end

#write_wire(wire_bytes) ⇒ void

This method returns an undefined value.

Writes pre-encoded wire bytes without flushing. Used for fan-out: encode once with ‘Codec::Frame.encode`, write to many connections.

Parameters:

  • wire_bytes (String)


175
176
177
178
179
180
181
# File 'lib/protocol/sp/connection.rb', line 175

def write_wire(wire_bytes)
  with_deferred_cancel do
    @mutex.synchronize do
      @io.write(wire_bytes)
    end
  end
end