Class: Protocol::SP::Connection
- Inherits:
-
Object
- Object
- Protocol::SP::Connection
- Defined in:
- lib/protocol/sp/connection.rb
Overview
Manages one SP peer connection over any transport IO.
The SP wire protocol has no commands, no security mechanisms, and no multipart messages — ‘#handshake!` is just an exchange of two 8-byte greetings, and `#send_message` / `#receive_message` work on single binary bodies framed by an 8-byte big-endian length.
Constant Summary collapse
- IPC_MSG_TYPE =
SP/IPC data messages are prefixed with a 1-byte message type. 0x01 = user message. 0x00 is reserved in nng for control frames (keepalive) that we don’t emit, but we still accept and skip on read for forward-compatibility with nng peers.
0x01
Instance Attribute Summary collapse
-
#framing ⇒ Symbol
readonly
:tcp or :ipc.
-
#io ⇒ Object
readonly
Transport IO (#read_exactly, #write, #flush, #close).
-
#last_received_at ⇒ Float?
readonly
Monotonic timestamp of last received frame.
-
#peer_protocol ⇒ Integer
readonly
Peer’s protocol id (set after handshake).
Instance Method Summary collapse
-
#close ⇒ void
Closes the connection.
-
#flush ⇒ void
Flushes the write buffer to the underlying IO.
-
#handshake! ⇒ void
Performs the SP/TCP greeting exchange.
-
#heartbeat_expired?(timeout) ⇒ Boolean
Returns true if no frame has been received within
timeoutseconds. -
#initialize(io, protocol:, max_message_size: nil, framing: :tcp) ⇒ Connection
constructor
A new instance of Connection.
-
#receive_message ⇒ String
Receives one message body.
-
#send_message(body, header: nil) ⇒ void
Sends one message (write + flush).
-
#touch_heartbeat ⇒ void
Records that a frame was received (for inactivity tracking).
-
#write_message(body, header: nil) ⇒ void
Writes one message to the buffer without flushing.
-
#write_messages(bodies) ⇒ void
Writes a batch of messages to the buffer under a single mutex acquisition.
-
#write_wire(wire_bytes) ⇒ void
Writes pre-encoded wire bytes without flushing.
Constructor Details
#initialize(io, protocol:, max_message_size: nil, framing: :tcp) ⇒ Connection
Returns a new instance of Connection.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/protocol/sp/connection.rb', line 42 def initialize(io, protocol:, max_message_size: nil, framing: :tcp) @io = io @protocol = protocol @peer_protocol = nil @max_message_size = @framing = framing @mutex = Mutex.new @last_received_at = nil # Reusable scratch buffer for frame headers — written into by # Array#pack(buffer:), then flushed to @io. Capacity 9 covers # both :tcp (8B) and :ipc (1+8B) framings. @header_buf = String.new(capacity: 9, encoding: Encoding::BINARY) end |
Instance Attribute Details
#framing ⇒ Symbol (readonly)
Returns :tcp or :ipc.
33 34 35 |
# File 'lib/protocol/sp/connection.rb', line 33 def framing @framing end |
#io ⇒ Object (readonly)
Returns transport IO (#read_exactly, #write, #flush, #close).
25 26 27 |
# File 'lib/protocol/sp/connection.rb', line 25 def io @io end |
#last_received_at ⇒ Float? (readonly)
Returns monotonic timestamp of last received frame.
29 30 31 |
# File 'lib/protocol/sp/connection.rb', line 29 def last_received_at @last_received_at end |
#peer_protocol ⇒ Integer (readonly)
Returns peer’s protocol id (set after handshake).
21 22 23 |
# File 'lib/protocol/sp/connection.rb', line 21 def peer_protocol @peer_protocol end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Closes the connection.
251 252 253 254 255 |
# File 'lib/protocol/sp/connection.rb', line 251 def close @io.close rescue IOError # already closed end |
#flush ⇒ void
This method returns an undefined value.
Flushes the write buffer to the underlying IO.
187 188 189 190 191 |
# File 'lib/protocol/sp/connection.rb', line 187 def flush @mutex.synchronize do @io.flush end end |
#handshake! ⇒ void
This method returns an undefined value.
Performs the SP/TCP greeting exchange.
62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/protocol/sp/connection.rb', line 62 def handshake! @io.write(Codec::Greeting.encode(protocol: @protocol)) @io.flush peer = Codec::Greeting.decode(@io.read_exactly(Codec::Greeting::SIZE)) @peer_protocol = peer valid = Protocols::VALID_PEERS[@protocol] unless valid&.include?(peer) raise Error, "incompatible SP protocols: 0x#{@protocol.to_s(16)} cannot speak to 0x#{peer.to_s(16)}" end end |
#heartbeat_expired?(timeout) ⇒ Boolean
Returns true if no frame has been received within timeout seconds.
241 242 243 244 245 |
# File 'lib/protocol/sp/connection.rb', line 241 def heartbeat_expired?(timeout) return false unless @last_received_at (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @last_received_at) > timeout end |
#receive_message ⇒ String
Receives one message body.
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/protocol/sp/connection.rb', line 199 def if @framing == :ipc loop do # One read_exactly(9) is cheaper than separate 1+8 reads: # halves the io-stream dispatch overhead per message. header = @io.read_exactly(9) type, size = header.unpack("CQ>") if @max_message_size && size > @max_message_size raise Error, "frame size #{size} exceeds max_message_size #{@max_message_size}" end body = size > 0 ? @io.read_exactly(size) : Codec::Frame::EMPTY_BODY touch_heartbeat # Skip nng IPC control frames (0x00 — keepalive/etc.); only # deliver user messages (0x01) to the caller. return body if type == IPC_MSG_TYPE end else frame = Codec::Frame.read_from(@io, max_message_size: @max_message_size) touch_heartbeat frame.body end rescue Error close raise end |
#send_message(body, header: nil) ⇒ void
This method returns an undefined value.
Sends one message (write + flush).
The optional header is a binary prefix written between the SP length prefix and body on the wire, framed as a single message of size ‘header.bytesize + body.bytesize`. It’s treated as an opaque slice so callers can pass a frozen shared buffer (e.g. a cached backtrace) without allocating a concatenated String. Receivers frame normally and get the header+body glued back together — this is purely a send-side allocation optimization.
89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/protocol/sp/connection.rb', line 89 def (body, header: nil) with_deferred_cancel do @mutex.synchronize do total = body.bytesize + (header ? header.bytesize : 0) write_header_nolock(total) @io.write(header) if header @io.write(body) @io.flush end end end |
#touch_heartbeat ⇒ void
This method returns an undefined value.
Records that a frame was received (for inactivity tracking).
232 233 234 |
# File 'lib/protocol/sp/connection.rb', line 232 def touch_heartbeat @last_received_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) end |
#write_message(body, header: nil) ⇒ void
This method returns an undefined value.
Writes one message to the buffer without flushing. Call #flush after batching writes.
Two writes — header then body — into the buffered IO; avoids the per-message intermediate String allocation that Protocol::SP::Codec::Frame.encode would otherwise produce. When header is supplied, this becomes three buffered writes coalesced by ‘IO::Stream::Buffered` into a single `writev`.
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/protocol/sp/connection.rb', line 114 def (body, header: nil) with_deferred_cancel do @mutex.synchronize do total = body.bytesize + (header ? header.bytesize : 0) write_header_nolock(total) @io.write(header) if header @io.write(body) end end end |
#write_messages(bodies) ⇒ void
This method returns an undefined value.
Writes a batch of messages to the buffer under a single mutex acquisition. Used by work-stealing send pumps that dequeue up to N messages at once — avoids N lock/unlock pairs per batch. Call #flush after to push the buffer to the socket.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/protocol/sp/connection.rb', line 133 def (bodies) with_deferred_cancel do @mutex.synchronize do i = 0 n = bodies.size while i < n body = bodies[i] write_header_nolock(body.bytesize) @io.write(body) i += 1 end end end end |
#write_wire(wire_bytes) ⇒ void
This method returns an undefined value.
Writes pre-encoded wire bytes without flushing. Used for fan-out: encode once with ‘Codec::Frame.encode`, write to many connections.
175 176 177 178 179 180 181 |
# File 'lib/protocol/sp/connection.rb', line 175 def write_wire(wire_bytes) with_deferred_cancel do @mutex.synchronize do @io.write(wire_bytes) end end end |