Class: Igniter::Store::NetworkBackend
- Inherits:
-
Object
- Object
- Igniter::Store::NetworkBackend
- Includes:
- WireProtocol
- Defined in:
- lib/igniter/store/network_backend.rb
Overview
NetworkBackend — client-side backend that proxies write_fact / replay / write_snapshot over a TCP or Unix socket connection to a StoreServer.
The wire protocol is CRC32-framed JSON (same framing as the WAL file format). Each request is a single frame; the server replies with a single frame.
Usage (via Companion::Store):
store = Igniter::Companion::Store.new(
backend: :network,
address: "127.0.0.1:7400",
transport: :tcp # default; or :unix for Unix domain sockets
)
Direct usage:
nb = Igniter::Store::NetworkBackend.new(address: "127.0.0.1:7400")
Reactive push subscription (separate connection, background thread):
handle = nb.subscribe(stores: [:tasks]) { |fact| puts fact.key }
handle.close # unsubscribes cleanly
Defined Under Namespace
Classes: NetworkError, Subscription
Constant Summary
Constants included from WireProtocol
WireProtocol::FRAME_CRC_SIZE, WireProtocol::FRAME_HEADER_SIZE
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(address:, transport: :tcp) ⇒ NetworkBackend
constructor
A new instance of NetworkBackend.
-
#replay ⇒ Object
Returns an Array<Fact> from the server’s durable store.
-
#subscribe(stores:, &callback) ⇒ Object
Opens a dedicated second connection for push events and registers a handler.
- #write_fact(fact) ⇒ Object
-
#write_snapshot(facts) ⇒ Object
Sends all
factsto the server for snapshot storage.
Methods included from WireProtocol
Constructor Details
#initialize(address:, transport: :tcp) ⇒ NetworkBackend
Returns a new instance of NetworkBackend.
53 54 55 56 57 58 |
# File 'lib/igniter/store/network_backend.rb', line 53 def initialize(address:, transport: :tcp) @address = address @transport = transport @mutex = Mutex.new @socket = connect end |
Instance Method Details
#close ⇒ Object
112 113 114 115 116 117 118 119 120 121 |
# File 'lib/igniter/store/network_backend.rb', line 112 def close @mutex.synchronize do send_frame({ op: "close" }) read_frame(@socket) # drain the server's { ok: true } so socket can close cleanly (FIN not RST) rescue IOError, Errno::EPIPE, Errno::ECONNRESET nil ensure @socket.close rescue nil end end |
#replay ⇒ Object
Returns an Array<Fact> from the server’s durable store.
66 67 68 69 |
# File 'lib/igniter/store/network_backend.rb', line 66 def replay response = rpc("replay") (response[:facts] || []).map { |h| decode_fact(h) } end |
#subscribe(stores:, &callback) ⇒ Object
Opens a dedicated second connection for push events and registers a handler. The main RPC connection is unaffected. Returns a Subscription handle; call handle.close to unsubscribe.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/igniter/store/network_backend.rb', line 81 def subscribe(stores:, &callback) raise ArgumentError, "subscribe requires a block" unless callback sub_socket = connect stores_s = Array(stores).map(&:to_s) sub_socket.write(encode_frame(JSON.generate({ op: "subscribe", stores: stores_s }))) body = read_frame(sub_socket) raise NetworkError, "Subscribe: server closed connection" unless body resp = JSON.parse(body, symbolize_names: true) raise NetworkError, resp[:error] unless resp[:ok] thread = Thread.new(sub_socket) do |sock| Thread.current.abort_on_exception = false loop do body = read_frame(sock) break unless body event = JSON.parse(body, symbolize_names: true) next unless event[:event] == "fact_written" fact = decode_fact(event[:fact]) callback.call(fact) rescue nil end rescue IOError, Errno::ECONNRESET, Errno::EPIPE, Errno::EBADF nil ensure sock.close rescue nil end Subscription.new(sub_socket, thread) end |
#write_fact(fact) ⇒ Object
60 61 62 63 |
# File 'lib/igniter/store/network_backend.rb', line 60 def write_fact(fact) rpc("write_fact", fact: fact.to_h) nil end |
#write_snapshot(facts) ⇒ Object
Sends all facts to the server for snapshot storage. No-op on the server side if the server backend does not support snapshots.
73 74 75 76 |
# File 'lib/igniter/store/network_backend.rb', line 73 def write_snapshot(facts) rpc("write_snapshot", facts: facts.map(&:to_h)) nil end |