Class: Igniter::Store::NetworkBackend

Inherits:
Object
  • Object
show all
Includes:
WireProtocol
Defined in:
lib/igniter/store/network_backend.rb

Overview

NetworkBackend — client-side backend that proxies write_fact / replay / write_snapshot over a TCP or Unix socket connection to a StoreServer.

The wire protocol is CRC32-framed JSON (same framing as the WAL file format). Each request is a single frame; the server replies with a single frame.

Usage (via Companion::Store):

store = Igniter::Companion::Store.new(
  backend:   :network,
  address:   "127.0.0.1:7400",
  transport: :tcp           # default; or :unix for Unix domain sockets
)

Direct usage:

nb = Igniter::Store::NetworkBackend.new(address: "127.0.0.1:7400")

Reactive push subscription (separate connection, background thread):

handle = nb.subscribe(stores: [:tasks]) { |fact| puts fact.key }
handle.close   # unsubscribes cleanly

Defined Under Namespace

Classes: NetworkError, Subscription

Constant Summary

Constants included from WireProtocol

WireProtocol::FRAME_CRC_SIZE, WireProtocol::FRAME_HEADER_SIZE

Instance Method Summary collapse

Methods included from WireProtocol

#encode_frame, #read_frame

Constructor Details

#initialize(address:, transport: :tcp) ⇒ NetworkBackend

Returns a new instance of NetworkBackend.



53
54
55
56
57
58
# File 'lib/igniter/store/network_backend.rb', line 53

def initialize(address:, transport: :tcp)
  @address   = address
  @transport = transport
  @mutex     = Mutex.new
  @socket    = connect
end

Instance Method Details

#closeObject



112
113
114
115
116
117
118
119
120
121
# File 'lib/igniter/store/network_backend.rb', line 112

def close
  @mutex.synchronize do
    send_frame({ op: "close" })
    read_frame(@socket)  # drain the server's { ok: true } so socket can close cleanly (FIN not RST)
  rescue IOError, Errno::EPIPE, Errno::ECONNRESET
    nil
  ensure
    @socket.close rescue nil
  end
end

#replayObject

Returns an Array<Fact> from the server’s durable store.



66
67
68
69
# File 'lib/igniter/store/network_backend.rb', line 66

def replay
  response = rpc("replay")
  (response[:facts] || []).map { |h| decode_fact(h) }
end

#subscribe(stores:, &callback) ⇒ Object

Opens a dedicated second connection for push events and registers a handler. The main RPC connection is unaffected. Returns a Subscription handle; call handle.close to unsubscribe.

Raises:

  • (ArgumentError)


81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/igniter/store/network_backend.rb', line 81

def subscribe(stores:, &callback)
  raise ArgumentError, "subscribe requires a block" unless callback

  sub_socket = connect
  stores_s   = Array(stores).map(&:to_s)
  sub_socket.write(encode_frame(JSON.generate({ op: "subscribe", stores: stores_s })))

  body = read_frame(sub_socket)
  raise NetworkError, "Subscribe: server closed connection" unless body
  resp = JSON.parse(body, symbolize_names: true)
  raise NetworkError, resp[:error] unless resp[:ok]

  thread = Thread.new(sub_socket) do |sock|
    Thread.current.abort_on_exception = false
    loop do
      body = read_frame(sock)
      break unless body
      event = JSON.parse(body, symbolize_names: true)
      next unless event[:event] == "fact_written"
      fact = decode_fact(event[:fact])
      callback.call(fact) rescue nil
    end
  rescue IOError, Errno::ECONNRESET, Errno::EPIPE, Errno::EBADF
    nil
  ensure
    sock.close rescue nil
  end

  Subscription.new(sub_socket, thread)
end

#write_fact(fact) ⇒ Object



60
61
62
63
# File 'lib/igniter/store/network_backend.rb', line 60

def write_fact(fact)
  rpc("write_fact", fact: fact.to_h)
  nil
end

#write_snapshot(facts) ⇒ Object

Sends all facts to the server for snapshot storage. No-op on the server side if the server backend does not support snapshots.



73
74
75
76
# File 'lib/igniter/store/network_backend.rb', line 73

def write_snapshot(facts)
  rpc("write_snapshot", facts: facts.map(&:to_h))
  nil
end