Class: Plushie::Connection
- Inherits:
-
Object
- Object
- Plushie::Connection
- Defined in:
- lib/plushie/connection.rb
Overview
Low-level protocol client for the plushie renderer.
Manages a bidirectional pipe to the renderer binary, handles wire framing, and provides thread-safe message sending. Decoded messages are pushed to a Thread::Queue or dispatched via a callback proc.
This layer is usable standalone for scripting and REPL exploration without the full Elm architecture:
conn = Plushie::Connection.spawn(format: :json) # hello is available after spawn puts conn.hello[:version] conn.send_encoded(Protocol::Encode.encode_snapshot(tree, :json)) conn.close
Instance Attribute Summary collapse
-
#format ⇒ :msgpack, :json
readonly
Wire format.
-
#hello ⇒ Hash?
readonly
Hello handshake response from renderer.
Class Method Summary collapse
-
.attach(stdin:, stdout:, format: :msgpack, settings: {}, queue: nil, on_message: nil) ⇒ Connection
Attach to existing IO streams (for :stdio transport).
-
.iostream(adapter:, format: :msgpack, settings: {}, queue: nil, on_message: nil) ⇒ Connection
Create a connection backed by an iostream adapter.
-
.spawn(format: :msgpack, binary: nil, mode: nil, max_sessions: nil, log_level: nil, settings: {}, queue: nil, on_message: nil) ⇒ Connection
Spawn a renderer process and perform the hello handshake.
Instance Method Summary collapse
-
#close ⇒ Object
Close the connection and clean up resources.
-
#closed? ⇒ Boolean
True if the connection is closed.
-
#receive_data(data) ⇒ Object
Called by the iostream adapter when data arrives from the transport.
-
#send_encoded(data) ⇒ Object
Send pre-encoded wire bytes to the renderer.
-
#send_message(msg) ⇒ Object
Encode a hash and send it.
-
#send_message_for_session(msg, session_id) ⇒ Object
Send a message with a specific session ID injected.
-
#transport_closed(reason) ⇒ Object
Called by the iostream adapter when the transport closes.
Instance Attribute Details
#format ⇒ :msgpack, :json (readonly)
Returns wire format.
24 25 26 |
# File 'lib/plushie/connection.rb', line 24 def format @format end |
#hello ⇒ Hash? (readonly)
Returns hello handshake response from renderer.
27 28 29 |
# File 'lib/plushie/connection.rb', line 27 def hello @hello end |
Class Method Details
.attach(stdin:, stdout:, format: :msgpack, settings: {}, queue: nil, on_message: nil) ⇒ Connection
Attach to existing IO streams (for :stdio transport).
58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/plushie/connection.rb', line 58 def self.attach(stdin:, stdout:, format: :msgpack, settings: {}, queue: nil, on_message: nil) conn = new(format: format, queue: queue, on_message: ) conn.instance_variable_set(:@stdin, stdin) conn.instance_variable_set(:@stdout, stdout) stdin.binmode stdout.binmode conn.send(:perform_handshake, settings) conn.send(:start_reader) conn end |
.iostream(adapter:, format: :msgpack, settings: {}, queue: nil, on_message: nil) ⇒ Connection
Create a connection backed by an iostream adapter.
The adapter mediates between the Connection and an external I/O source (SSH channel, TCP socket, WebSocket, etc.). Instead of reading/writing pipes, the connection exchanges data with the adapter via method calls.
The adapter must respond to:
- +on_bridge(connection)+: called during init, adapter stores connection ref
- +send_data(data)+: called by Connection to write encoded bytes
The adapter calls back:
- +connection.receive_data(data)+: when data arrives from the transport
- +connection.transport_closed(reason)+: when the transport closes
91 92 93 94 95 96 |
# File 'lib/plushie/connection.rb', line 91 def self.iostream(adapter:, format: :msgpack, settings: {}, queue: nil, on_message: nil) conn = new(format: format, queue: queue, on_message: ) conn.send(:setup_iostream, adapter, settings) conn end |
.spawn(format: :msgpack, binary: nil, mode: nil, max_sessions: nil, log_level: nil, settings: {}, queue: nil, on_message: nil) ⇒ Connection
Spawn a renderer process and perform the hello handshake.
40 41 42 43 44 45 46 47 |
# File 'lib/plushie/connection.rb', line 40 def self.spawn(format: :msgpack, binary: nil, mode: nil, max_sessions: nil, log_level: nil, settings: {}, queue: nil, on_message: nil) conn = new(format: format, queue: queue, on_message: ) conn.send(:spawn_process, binary, mode, max_sessions, log_level) conn.send(:perform_handshake, settings) conn.send(:start_reader) conn end |
Instance Method Details
#close ⇒ Object
Close the connection and clean up resources.
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/plushie/connection.rb', line 165 def close return if @closed @closed = true stop_thread(@reader_thread, timeout: 1) @reader_thread = nil if @iostream_adapter @iostream_adapter.stop if @iostream_adapter.respond_to?(:stop) else begin @stdin&.close rescue nil end begin @stdout&.close rescue nil end begin @process_thread&.value rescue nil end end end |
#closed? ⇒ Boolean
Returns true if the connection is closed.
192 193 194 |
# File 'lib/plushie/connection.rb', line 192 def closed? @closed end |
#receive_data(data) ⇒ Object
Called by the iostream adapter when data arrives from the transport. The adapter is responsible for framing: each call should deliver one complete protocol message.
126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/plushie/connection.rb', line 126 def receive_data(data) return if @closed msg = Protocol::Decode.decode(data, @format) decoded = Protocol::Decode.(msg) if !@hello && decoded.is_a?(Hash) && decoded[:type] == :hello @hello = decoded @handshake_queue&.push(decoded) elsif decoded (decoded) end end |
#send_encoded(data) ⇒ Object
Send pre-encoded wire bytes to the renderer. Thread-safe.
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/plushie/connection.rb', line 101 def send_encoded(data) @write_mutex.synchronize do if @iostream_adapter @iostream_adapter.send_data(data) else case @format when :msgpack @stdin.write([data.bytesize].pack("N")) @stdin.write(data) when :json @stdin.write(data) end @stdin.flush end end rescue IOError, Errno::EPIPE => e @closed = true ({type: :connection_error, error: e}) end |
#send_message(msg) ⇒ Object
Encode a hash and send it. Injects session field if missing.
152 153 154 |
# File 'lib/plushie/connection.rb', line 152 def (msg) send_encoded(Protocol::Encode.encode(msg, @format)) end |
#send_message_for_session(msg, session_id) ⇒ Object
Send a message with a specific session ID injected.
160 161 162 |
# File 'lib/plushie/connection.rb', line 160 def (msg, session_id) (msg.merge(session: session_id)) end |
#transport_closed(reason) ⇒ Object
Called by the iostream adapter when the transport closes.
143 144 145 146 147 |
# File 'lib/plushie/connection.rb', line 143 def transport_closed(reason) return if @closed @closed = true ({type: :connection_closed, reason: reason}) end |