Class: Protocol::SP::Connection
- Inherits:
-
Object
- Object
- Protocol::SP::Connection
- Defined in:
- lib/protocol/sp/connection.rb
Overview
Manages one SP peer connection over any transport IO.
The SP wire protocol has no commands, no security mechanisms, and no multipart messages — ‘#handshake!` is just an exchange of two 8-byte greetings, and `#send_message` / `#receive_message` work on single binary bodies framed by an 8-byte big-endian length.
Constant Summary collapse
- IPC_MSG_TYPE =
SP/IPC data messages are prefixed with a 1-byte message type. 0x01 = user message. 0x00 is reserved in nng for control frames (keepalive) that we don’t emit, but we still accept and skip on read for forward-compatibility with nng peers.
0x01
Instance Attribute Summary collapse
-
#framing ⇒ Symbol
readonly
:tcp or :ipc.
-
#io ⇒ Object
readonly
Transport IO (#read_exactly, #write, #flush, #close).
-
#last_received_at ⇒ Float?
readonly
Monotonic timestamp of last received frame.
-
#peer_protocol ⇒ Integer
readonly
Peer’s protocol id (set after handshake).
Instance Method Summary collapse
-
#close ⇒ void
Closes the connection.
-
#flush ⇒ void
Flushes the write buffer to the underlying IO.
-
#handshake! ⇒ void
Performs the SP/TCP greeting exchange.
-
#heartbeat_expired?(timeout) ⇒ Boolean
Returns true if no frame has been received within
timeoutseconds. -
#initialize(io, protocol:, max_message_size: nil, framing: :tcp) ⇒ Connection
constructor
A new instance of Connection.
-
#receive_message ⇒ String
Receives one message body.
-
#send_message(body) ⇒ void
Sends one message (write + flush).
-
#touch_heartbeat ⇒ void
Records that a frame was received (for inactivity tracking).
-
#write_message(body) ⇒ void
Writes one message to the buffer without flushing.
-
#write_messages(bodies) ⇒ void
Writes a batch of messages to the buffer under a single mutex acquisition.
-
#write_wire(wire_bytes) ⇒ void
Writes pre-encoded wire bytes without flushing.
Constructor Details
#initialize(io, protocol:, max_message_size: nil, framing: :tcp) ⇒ Connection
Returns a new instance of Connection.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/protocol/sp/connection.rb', line 42 def initialize(io, protocol:, max_message_size: nil, framing: :tcp) @io = io @protocol = protocol @peer_protocol = nil @max_message_size = @framing = framing @mutex = Mutex.new @last_received_at = nil # Reusable scratch buffer for frame headers — written into by # Array#pack(buffer:), then flushed to @io. Capacity 9 covers # both :tcp (8B) and :ipc (1+8B) framings. @header_buf = String.new(capacity: 9, encoding: Encoding::BINARY) end |
Instance Attribute Details
#framing ⇒ Symbol (readonly)
Returns :tcp or :ipc.
33 34 35 |
# File 'lib/protocol/sp/connection.rb', line 33 def framing @framing end |
#io ⇒ Object (readonly)
Returns transport IO (#read_exactly, #write, #flush, #close).
25 26 27 |
# File 'lib/protocol/sp/connection.rb', line 25 def io @io end |
#last_received_at ⇒ Float? (readonly)
Returns monotonic timestamp of last received frame.
29 30 31 |
# File 'lib/protocol/sp/connection.rb', line 29 def last_received_at @last_received_at end |
#peer_protocol ⇒ Integer (readonly)
Returns peer’s protocol id (set after handshake).
21 22 23 |
# File 'lib/protocol/sp/connection.rb', line 21 def peer_protocol @peer_protocol end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Closes the connection.
235 236 237 238 239 |
# File 'lib/protocol/sp/connection.rb', line 235 def close @io.close rescue IOError # already closed end |
#flush ⇒ void
This method returns an undefined value.
Flushes the write buffer to the underlying IO.
171 172 173 174 175 |
# File 'lib/protocol/sp/connection.rb', line 171 def flush @mutex.synchronize do @io.flush end end |
#handshake! ⇒ void
This method returns an undefined value.
Performs the SP/TCP greeting exchange.
62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/protocol/sp/connection.rb', line 62 def handshake! @io.write(Codec::Greeting.encode(protocol: @protocol)) @io.flush peer = Codec::Greeting.decode(@io.read_exactly(Codec::Greeting::SIZE)) @peer_protocol = peer valid = Protocols::VALID_PEERS[@protocol] unless valid&.include?(peer) raise Error, "incompatible SP protocols: 0x#{@protocol.to_s(16)} cannot speak to 0x#{peer.to_s(16)}" end end |
#heartbeat_expired?(timeout) ⇒ Boolean
Returns true if no frame has been received within timeout seconds.
225 226 227 228 229 |
# File 'lib/protocol/sp/connection.rb', line 225 def heartbeat_expired?(timeout) return false unless @last_received_at (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @last_received_at) > timeout end |
#receive_message ⇒ String
Receives one message body.
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/protocol/sp/connection.rb', line 183 def if @framing == :ipc loop do # One read_exactly(9) is cheaper than separate 1+8 reads: # halves the io-stream dispatch overhead per message. header = @io.read_exactly(9) type, size = header.unpack("CQ>") if @max_message_size && size > @max_message_size raise Error, "frame size #{size} exceeds max_message_size #{@max_message_size}" end body = size > 0 ? @io.read_exactly(size) : Codec::Frame::EMPTY_BODY touch_heartbeat # Skip nng IPC control frames (0x00 — keepalive/etc.); only # deliver user messages (0x01) to the caller. return body if type == IPC_MSG_TYPE end else frame = Codec::Frame.read_from(@io, max_message_size: @max_message_size) touch_heartbeat frame.body end rescue Error close raise end |
#send_message(body) ⇒ void
This method returns an undefined value.
Sends one message (write + flush).
80 81 82 83 84 85 86 87 88 |
# File 'lib/protocol/sp/connection.rb', line 80 def (body) with_deferred_cancel do @mutex.synchronize do write_header_nolock(body.bytesize) @io.write(body) @io.flush end end end |
#touch_heartbeat ⇒ void
This method returns an undefined value.
Records that a frame was received (for inactivity tracking).
216 217 218 |
# File 'lib/protocol/sp/connection.rb', line 216 def touch_heartbeat @last_received_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) end |
#write_message(body) ⇒ void
This method returns an undefined value.
Writes one message to the buffer without flushing. Call #flush after batching writes.
Two writes — header then body — into the buffered IO; avoids the per-message intermediate String allocation that Protocol::SP::Codec::Frame.encode would otherwise produce.
100 101 102 103 104 105 106 107 |
# File 'lib/protocol/sp/connection.rb', line 100 def (body) with_deferred_cancel do @mutex.synchronize do write_header_nolock(body.bytesize) @io.write(body) end end end |
#write_messages(bodies) ⇒ void
This method returns an undefined value.
Writes a batch of messages to the buffer under a single mutex acquisition. Used by work-stealing send pumps that dequeue up to N messages at once — avoids N lock/unlock pairs per batch. Call #flush after to push the buffer to the socket.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/protocol/sp/connection.rb', line 117 def (bodies) with_deferred_cancel do @mutex.synchronize do i = 0 n = bodies.size while i < n body = bodies[i] write_header_nolock(body.bytesize) @io.write(body) i += 1 end end end end |
#write_wire(wire_bytes) ⇒ void
This method returns an undefined value.
Writes pre-encoded wire bytes without flushing. Used for fan-out: encode once with ‘Codec::Frame.encode`, write to many connections.
159 160 161 162 163 164 165 |
# File 'lib/protocol/sp/connection.rb', line 159 def write_wire(wire_bytes) with_deferred_cancel do @mutex.synchronize do @io.write(wire_bytes) end end end |