Class: Igniter::Store::StoreServer
- Inherits:
-
Object
- Object
- Igniter::Store::StoreServer
- Includes:
- WireProtocol
- Defined in:
- lib/igniter/store/store_server.rb
Overview
Minimal TCP / Unix socket server that exposes durable fact storage over the network. Clients use NetworkBackend to connect.
The server is the “durability half” of the network topology: it persists facts and serves replay requests. All in-memory indices (scope, partition, cache, coercions) are rebuilt by each client from the replayed facts.
Lifecycle:
server = StoreServer.new(host: "127.0.0.1", port: 7400, backend: :file, path: "store.wal")
server.start_async # background thread
server.wait_until_ready # blocks until accepting (no sleep needed)
...
server.stop # graceful drain, then close
Foreground / CLI:
server.start_foreground # sets signal traps, blocks until stop
Configuration object:
config = ServerConfig.new(host: "0.0.0.0", port: 7400, backend: :file, ...)
server = StoreServer.new(config: config)
Constant Summary
Constants included from WireProtocol
WireProtocol::FRAME_CRC_SIZE, WireProtocol::FRAME_HEADER_SIZE
Instance Method Summary collapse
-
#active_connections ⇒ Object
Number of currently active client connections.
-
#bind_address ⇒ Object
Canonical bind address string (e.g. “127.0.0.1:7400” or “/tmp/store.sock”).
-
#changefeed ⇒ Object
The server’s ChangefeedBuffer — used by SSE and other push transports.
-
#drain(timeout: nil) ⇒ Object
Transitions the server into draining state: new connections are rejected but the accept loop keeps running.
-
#draining? ⇒ Boolean
True when the server has entered draining state (rejecting new connections).
-
#health_snapshot ⇒ Object
Compact health snapshot Hash.
-
#initialize(host: nil, port: nil, transport: nil, backend: nil, path: nil, logger: nil, pid_file: nil, drain_timeout: nil, max_connections: nil, config: nil, address: nil, metrics_thresholds: {}, slow_op_threshold_ms: nil, max_recent_events: 100, changefeed: nil) ⇒ StoreServer
constructor
Accepts keyword args (backward compatible) OR a
config:ServerConfig. -
#metrics_snapshot ⇒ Object
Full metrics snapshot including counters, connection telemetry, and storage stats.
-
#observability_snapshot ⇒ Object
Canonical observability snapshot — single source of truth for all transports.
-
#protocol ⇒ Object
Lazy Protocol::Interpreter for the envelope dispatch layer.
-
#ready? ⇒ Boolean
True when the server is live and accepting traffic.
-
#recent_events ⇒ Object
Recent structured events from the bounded ring buffer.
-
#start ⇒ Object
Starts the accept loop in the calling thread (blocks until #stop).
-
#start_async ⇒ Object
Starts the accept loop in a background daemon thread.
-
#start_foreground ⇒ Object
Starts the accept loop with SIGTERM/SIGINT traps for CLI/foreground use.
-
#start_with_adapters(http_port: nil, tcp_port: nil) ⇒ Object
Starts the legacy accept loop plus optional HTTP/TCP envelope adapters, all in one foreground process.
-
#stop(timeout: nil) ⇒ Object
Gracefully stops the server.
-
#subscription_count(store) ⇒ Object
Number of active push subscriptions for a given store name.
-
#wait_until_ready(timeout: 2) ⇒ Object
Blocks until the server’s accept loop is running and ready for connections.
Methods included from WireProtocol
Constructor Details
#initialize(host: nil, port: nil, transport: nil, backend: nil, path: nil, logger: nil, pid_file: nil, drain_timeout: nil, max_connections: nil, config: nil, address: nil, metrics_thresholds: {}, slow_op_threshold_ms: nil, max_recent_events: 100, changefeed: nil) ⇒ StoreServer
Accepts keyword args (backward compatible) OR a config: ServerConfig. Keyword args take precedence over config fields when both are given.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/igniter/store/store_server.rb', line 68 def initialize(host: nil, port: nil, transport: nil, backend: nil, path: nil, logger: nil, pid_file: nil, drain_timeout: nil, max_connections: nil, config: nil, # Legacy positional-style: address: "host:port" address: nil, metrics_thresholds: {}, slow_op_threshold_ms: nil, max_recent_events: 100, changefeed: nil) cfg = config || ServerConfig.new # Keyword args override the config where explicitly provided. resolved_host = host || (address ? split_address(address).first : nil) || cfg.host resolved_port = port || (address ? split_address(address).last : nil) || cfg.port resolved_transport = transport || cfg.transport resolved_backend = backend || cfg.backend resolved_path = path || cfg.path resolved_pid = pid_file || cfg.pid_file resolved_drain = drain_timeout || cfg.drain_timeout resolved_max = max_connections || cfg.max_connections log_io = config&.log_io || $stdout log_level = config&.log_level || :info @logger = logger || ServerLogger.new(log_io, log_level) @backend_type = resolved_backend @transport_type = resolved_transport @backend = build_backend(resolved_backend, resolved_path) @server = build_server(resolved_host, resolved_port, resolved_transport) @write_mutex = Mutex.new @active = 0 @active_mutex = Mutex.new @in_memory_facts = [] @stopped = false @started_at = nil @pid_file = resolved_pid @drain_timeout = resolved_drain @max_connections = resolved_max @ready_mutex = Mutex.new @ready_cond = ConditionVariable.new # The server socket is bound and listening as soon as build_server returns. # Signal readiness here so wait_until_ready is race-free for callers that # connect before start_async is called. resolved_cf = (cfg.changefeed || {}).merge(changefeed || {}) @changefeed = ChangefeedBuffer.new(**resolved_cf) @ready_latch = true @started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) # Cache the bind address string now while the socket is guaranteed open, # so start/stop threads don't race on @server.addr after close. @bind_address_str = resolved_transport == :unix ? resolved_host.to_s : "#{@server.addr[3]}:#{@server.addr[1]}" @metrics = ServerMetrics.new(thresholds: metrics_thresholds) @last_error = nil @draining = false @slow_op_threshold_ms = slow_op_threshold_ms @event_ring = EventRing.new(max_recent_events) write_pid_file(resolved_pid) end |
Instance Method Details
#active_connections ⇒ Object
Number of currently active client connections.
259 260 261 |
# File 'lib/igniter/store/store_server.rb', line 259 def active_connections @active_mutex.synchronize { @active } end |
#bind_address ⇒ Object
Canonical bind address string (e.g. “127.0.0.1:7400” or “/tmp/store.sock”). Cached at initialize time — safe to call even after stop closes the socket.
254 255 256 |
# File 'lib/igniter/store/store_server.rb', line 254 def bind_address @bind_address_str end |
#changefeed ⇒ Object
The server’s ChangefeedBuffer — used by SSE and other push transports.
269 270 271 |
# File 'lib/igniter/store/store_server.rb', line 269 def changefeed @changefeed end |
#drain(timeout: nil) ⇒ Object
Transitions the server into draining state: new connections are rejected but the accept loop keeps running. Existing connections are allowed to finish (or time out). Call stop afterward to tear down the socket.
Returns self so callers can chain: server.drain.stop
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/igniter/store/store_server.rb', line 230 def drain(timeout: nil) return self if @stopped t = timeout || @drain_timeout @draining = true emit_event(:server_draining, bind_address: @bind_address_str) @logger.info("Draining (timeout=#{t}s)...") deadline = Time.now + t loop do active = @active_mutex.synchronize { @active } break if active.zero? || Time.now >= deadline sleep 0.05 end remaining = @active_mutex.synchronize { @active } @logger.warn("Drain timeout: #{remaining} connection(s) still active.") if remaining.positive? self end |
#draining? ⇒ Boolean
True when the server has entered draining state (rejecting new connections).
277 |
# File 'lib/igniter/store/store_server.rb', line 277 def draining? = @draining |
#health_snapshot ⇒ Object
Compact health snapshot Hash. status: :ready | :draining | :stopped
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/igniter/store/store_server.rb', line 287 def health_snapshot now = Process.clock_gettime(Process::CLOCK_MONOTONIC) snap = @metrics.snapshot { schema_version: 1, status: current_status, backend: @backend_type.to_s, transport: @transport_type.to_s, bind_address: @bind_address_str, uptime_ms: ((@started_at ? now - @started_at : 0) * 1000).ceil, active_connections: active_connections, subscriptions: snap[:subscription_count], last_error: @last_error } end |
#metrics_snapshot ⇒ Object
Full metrics snapshot including counters, connection telemetry, and storage stats.
304 305 306 |
# File 'lib/igniter/store/store_server.rb', line 304 def metrics_snapshot @metrics.snapshot(backend: @backend) end |
#observability_snapshot ⇒ Object
Canonical observability snapshot — single source of truth for all transports.
Canonical shape (same top-level keys across protocol, HTTP, MCP, and server):
schema_version, generated_at, status, uptime_ms, metrics, alerts, storage, server
This is the full server+storage shape. For the compact health check shape use #health_snapshot. For the pure storage-level protocol shape use Protocol::Interpreter#observability_snapshot.
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
# File 'lib/igniter/store/store_server.rb', line 316 def observability_snapshot @metrics.check_alerts(backend: @backend) snap = @metrics.snapshot(backend: @backend) cf_snap = @changefeed.snapshot now = Process.clock_gettime(Process::CLOCK_MONOTONIC) # Merge server/storage alerts with changefeed delivery alerts. all_alerts = Array(snap[:alerts]) + Array(cf_snap[:alerts]) { schema_version: 1, generated_at: snap[:generated_at], status: current_status, uptime_ms: ((@started_at ? now - @started_at : 0) * 1000).ceil, metrics: { requests_total: snap[:requests_total], errors_total: snap[:errors_total], slow_ops_total: snap[:slow_ops_total], facts_written: snap[:facts_written], facts_replayed: snap[:facts_replayed], bytes_in: snap[:bytes_in], bytes_out: snap[:bytes_out], active_connections: snap[:active_connections], accepted_connections_total: snap[:accepted_connections_total], closed_connections_total: snap[:closed_connections_total], rejected_connections_total: snap[:rejected_connections_total], subscription_count: snap[:subscription_count] }, alerts: all_alerts, storage: snap[:storage_stats], server: { backend: @backend_type.to_s, transport: @transport_type.to_s, bind_address: @bind_address_str, last_error: @last_error }, changefeed: cf_snap } end |
#protocol ⇒ Object
Lazy Protocol::Interpreter for the envelope dispatch layer. Owns a fresh IgniterStore independent of the legacy fact log. HTTP and TCP adapters share this interpreter instance.
357 358 359 |
# File 'lib/igniter/store/store_server.rb', line 357 def protocol @protocol ||= Protocol::Interpreter.new(IgniterStore.new) end |
#ready? ⇒ Boolean
True when the server is live and accepting traffic.
274 |
# File 'lib/igniter/store/store_server.rb', line 274 def ready? = !@stopped && !@draining |
#recent_events ⇒ Object
Recent structured events from the bounded ring buffer. Returns an Array of event hashes (newest at end), size ≤ max_recent_events.
281 282 283 |
# File 'lib/igniter/store/store_server.rb', line 281 def recent_events @event_ring.to_a end |
#start ⇒ Object
Starts the accept loop in the calling thread (blocks until #stop).
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/igniter/store/store_server.rb', line 131 def start @logger.info("Listening on #{@bind_address_str} " \ "(transport=#{@transport_type} backend=#{@backend_type})") emit_event(:server_start, bind_address: @bind_address_str, transport: @transport_type, backend: @backend_type) until @stopped begin client = @server.accept rescue IOError, Errno::EBADF break end if @draining @metrics.record_connection_rejected client.close rescue nil next end active = @active_mutex.synchronize { @active += 1; @active } if @max_connections && active > @max_connections @active_mutex.synchronize { @active -= 1 } @metrics.record_connection_rejected emit_event(:alert, type: :max_connections, active: active, max: @max_connections) client.close rescue nil next end @logger.debug("Connection accepted (active=#{active})") Thread.new(client) { |s| handle_client(s) } end ensure remove_pid_file @logger.info("Stopped.") emit_event(:server_stop, bind_address: @bind_address_str) end |
#start_async ⇒ Object
Starts the accept loop in a background daemon thread. Call wait_until_ready after this to avoid race conditions.
172 173 174 175 176 177 |
# File 'lib/igniter/store/store_server.rb', line 172 def start_async Thread.new do Thread.current.abort_on_exception = false start end end |
#start_foreground ⇒ Object
Starts the accept loop with SIGTERM/SIGINT traps for CLI/foreground use. Blocks until a signal or #stop is called.
195 196 197 198 199 |
# File 'lib/igniter/store/store_server.rb', line 195 def start_foreground trap("SIGTERM") { stop } trap("INT") { stop } start end |
#start_with_adapters(http_port: nil, tcp_port: nil) ⇒ Object
Starts the legacy accept loop plus optional HTTP/TCP envelope adapters, all in one foreground process. Adapters are stopped on exit.
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 |
# File 'lib/igniter/store/store_server.rb', line 363 def start_with_adapters(http_port: nil, tcp_port: nil) http = http_port ? HTTPAdapter.new( interpreter: protocol, port: http_port, health_provider: method(:health_snapshot), status_provider: method(:observability_snapshot), ready_provider: method(:ready?), metrics_provider: -> { observability_snapshot[:metrics] }, events_provider: method(:recent_events), changefeed_provider: method(:changefeed) ) : nil tcp = tcp_port ? TCPAdapter.new(interpreter: protocol, port: tcp_port) : nil http&.start_async tcp&.start_async start_foreground ensure http&.stop tcp&.stop end |
#stop(timeout: nil) ⇒ Object
Gracefully stops the server.
-
Closes the server socket (no new connections accepted).
-
Waits up to
timeoutseconds for active connections to finish. -
Force-closes remaining connections and closes the backend.
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/igniter/store/store_server.rb', line 205 def stop(timeout: nil) t = timeout || @drain_timeout @stopped = true @logger.info("Stopping (drain_timeout=#{t}s)...") @server.close rescue nil remove_pid_file deadline = Time.now + t loop do active = @active_mutex.synchronize { @active } break if active.zero? || Time.now >= deadline @logger.debug("Draining #{active} connection(s)...") sleep 0.05 end remaining = @active_mutex.synchronize { @active } @logger.warn("Force-closing #{remaining} connection(s).") if remaining.positive? @write_mutex.synchronize { @backend&.close rescue nil } end |
#subscription_count(store) ⇒ Object
Number of active push subscriptions for a given store name.
264 265 266 |
# File 'lib/igniter/store/store_server.rb', line 264 def subscription_count(store) @changefeed.subscriber_count(store) end |
#wait_until_ready(timeout: 2) ⇒ Object
Blocks until the server’s accept loop is running and ready for connections. Replaces the sleep 0.05 hack in callers.
181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/igniter/store/store_server.rb', line 181 def wait_until_ready(timeout: 2) @ready_mutex.synchronize do deadline = Time.now + timeout until @ready_latch remaining = deadline - Time.now raise "StoreServer did not become ready within #{timeout}s" if remaining <= 0 @ready_cond.wait(@ready_mutex, remaining) end end self end |