Class: Igniter::Store::StoreServer

Inherits:
Object
  • Object
show all
Includes:
WireProtocol
Defined in:
lib/igniter/store/store_server.rb

Overview

Minimal TCP / Unix socket server that exposes durable fact storage over the network. Clients use NetworkBackend to connect.

The server is the “durability half” of the network topology: it persists facts and serves replay requests. All in-memory indices (scope, partition, cache, coercions) are rebuilt by each client from the replayed facts.

Lifecycle:

server = StoreServer.new(host: "127.0.0.1", port: 7400, backend: :file, path: "store.wal")
server.start_async            # background thread
server.wait_until_ready       # blocks until accepting (no sleep needed)
...
server.stop                   # graceful drain, then close

Foreground / CLI:

server.start_foreground       # sets signal traps, blocks until stop

Configuration object:

config = ServerConfig.new(host: "0.0.0.0", port: 7400, backend: :file, ...)
server = StoreServer.new(config: config)

Constant Summary

Constants included from WireProtocol

WireProtocol::FRAME_CRC_SIZE, WireProtocol::FRAME_HEADER_SIZE

Instance Method Summary collapse

Methods included from WireProtocol

#encode_frame, #read_frame

Constructor Details

#initialize(host: nil, port: nil, transport: nil, backend: nil, path: nil, logger: nil, pid_file: nil, drain_timeout: nil, max_connections: nil, config: nil, address: nil, metrics_thresholds: {}, slow_op_threshold_ms: nil, max_recent_events: 100, changefeed: nil) ⇒ StoreServer

Accepts keyword args (backward compatible) OR a config: ServerConfig. Keyword args take precedence over config fields when both are given.



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/igniter/store/store_server.rb', line 68

def initialize(host: nil, port: nil, transport: nil, backend: nil, path: nil,
               logger: nil, pid_file: nil, drain_timeout: nil,
               max_connections: nil, config: nil,
               # Legacy positional-style: address: "host:port"
               address: nil,
               metrics_thresholds: {},
               slow_op_threshold_ms: nil,
               max_recent_events:    100,
               changefeed:           nil)
  cfg = config || ServerConfig.new

  # Keyword args override the config where explicitly provided.
  resolved_host      = host      || (address ? split_address(address).first : nil) || cfg.host
  resolved_port      = port      || (address ? split_address(address).last  : nil) || cfg.port
  resolved_transport = transport || cfg.transport
  resolved_backend   = backend   || cfg.backend
  resolved_path      = path      || cfg.path
  resolved_pid       = pid_file  || cfg.pid_file
  resolved_drain     = drain_timeout  || cfg.drain_timeout
  resolved_max       = max_connections || cfg.max_connections

  log_io    = config&.log_io    || $stdout
  log_level = config&.log_level || :info

  @logger          = logger || ServerLogger.new(log_io, log_level)
  @backend_type    = resolved_backend
  @transport_type  = resolved_transport
  @backend         = build_backend(resolved_backend, resolved_path)
  @server          = build_server(resolved_host, resolved_port, resolved_transport)
  @write_mutex     = Mutex.new
  @active          = 0
  @active_mutex    = Mutex.new
  @in_memory_facts = []
  @stopped         = false
  @started_at      = nil
  @pid_file        = resolved_pid
  @drain_timeout   = resolved_drain
  @max_connections = resolved_max
  @ready_mutex     = Mutex.new
  @ready_cond      = ConditionVariable.new
  # The server socket is bound and listening as soon as build_server returns.
  # Signal readiness here so wait_until_ready is race-free for callers that
  # connect before start_async is called.
  resolved_cf      = (cfg.changefeed || {}).merge(changefeed || {})
  @changefeed      = ChangefeedBuffer.new(**resolved_cf)
  @ready_latch     = true
  @started_at      = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  # Cache the bind address string now while the socket is guaranteed open,
  # so start/stop threads don't race on @server.addr after close.
  @bind_address_str = resolved_transport == :unix ?
    resolved_host.to_s :
    "#{@server.addr[3]}:#{@server.addr[1]}"
  @metrics              = ServerMetrics.new(thresholds: metrics_thresholds)
  @last_error           = nil
  @draining             = false
  @slow_op_threshold_ms = slow_op_threshold_ms
  @event_ring           = EventRing.new(max_recent_events)
  write_pid_file(resolved_pid)
end

Instance Method Details

#active_connectionsObject

Number of currently active client connections.



259
260
261
# File 'lib/igniter/store/store_server.rb', line 259

def active_connections
  @active_mutex.synchronize { @active }
end

#bind_addressObject

Canonical bind address string (e.g. “127.0.0.1:7400” or “/tmp/store.sock”). Cached at initialize time — safe to call even after stop closes the socket.



254
255
256
# File 'lib/igniter/store/store_server.rb', line 254

def bind_address
  @bind_address_str
end

#changefeedObject

The server’s ChangefeedBuffer — used by SSE and other push transports.



269
270
271
# File 'lib/igniter/store/store_server.rb', line 269

def changefeed
  @changefeed
end

#drain(timeout: nil) ⇒ Object

Transitions the server into draining state: new connections are rejected but the accept loop keeps running. Existing connections are allowed to finish (or time out). Call stop afterward to tear down the socket.

Returns self so callers can chain: server.drain.stop



230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/igniter/store/store_server.rb', line 230

def drain(timeout: nil)
  return self if @stopped

  t = timeout || @drain_timeout
  @draining = true
  emit_event(:server_draining, bind_address: @bind_address_str)
  @logger.info("Draining (timeout=#{t}s)...")

  deadline = Time.now + t
  loop do
    active = @active_mutex.synchronize { @active }
    break if active.zero? || Time.now >= deadline
    sleep 0.05
  end

  remaining = @active_mutex.synchronize { @active }
  @logger.warn("Drain timeout: #{remaining} connection(s) still active.") if remaining.positive?
  self
end

#draining?Boolean

True when the server has entered draining state (rejecting new connections).

Returns:

  • (Boolean)


277
# File 'lib/igniter/store/store_server.rb', line 277

def draining? = @draining

#health_snapshotObject

Compact health snapshot Hash. status: :ready | :draining | :stopped



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/igniter/store/store_server.rb', line 287

def health_snapshot
  now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  snap = @metrics.snapshot
  {
    schema_version:     1,
    status:             current_status,
    backend:            @backend_type.to_s,
    transport:          @transport_type.to_s,
    bind_address:       @bind_address_str,
    uptime_ms:          ((@started_at ? now - @started_at : 0) * 1000).ceil,
    active_connections: active_connections,
    subscriptions:      snap[:subscription_count],
    last_error:         @last_error
  }
end

#metrics_snapshotObject

Full metrics snapshot including counters, connection telemetry, and storage stats.



304
305
306
# File 'lib/igniter/store/store_server.rb', line 304

def metrics_snapshot
  @metrics.snapshot(backend: @backend)
end

#observability_snapshotObject

Canonical observability snapshot — single source of truth for all transports.

Canonical shape (same top-level keys across protocol, HTTP, MCP, and server):

schema_version, generated_at, status, uptime_ms, metrics, alerts, storage, server

This is the full server+storage shape. For the compact health check shape use #health_snapshot. For the pure storage-level protocol shape use Protocol::Interpreter#observability_snapshot.



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/igniter/store/store_server.rb', line 316

def observability_snapshot
  @metrics.check_alerts(backend: @backend)
  snap    = @metrics.snapshot(backend: @backend)
  cf_snap = @changefeed.snapshot
  now     = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  # Merge server/storage alerts with changefeed delivery alerts.
  all_alerts = Array(snap[:alerts]) + Array(cf_snap[:alerts])
  {
    schema_version: 1,
    generated_at:   snap[:generated_at],
    status:         current_status,
    uptime_ms:      ((@started_at ? now - @started_at : 0) * 1000).ceil,
    metrics: {
      requests_total:             snap[:requests_total],
      errors_total:               snap[:errors_total],
      slow_ops_total:             snap[:slow_ops_total],
      facts_written:              snap[:facts_written],
      facts_replayed:             snap[:facts_replayed],
      bytes_in:                   snap[:bytes_in],
      bytes_out:                  snap[:bytes_out],
      active_connections:         snap[:active_connections],
      accepted_connections_total: snap[:accepted_connections_total],
      closed_connections_total:   snap[:closed_connections_total],
      rejected_connections_total: snap[:rejected_connections_total],
      subscription_count:         snap[:subscription_count]
    },
    alerts:     all_alerts,
    storage:    snap[:storage_stats],
    server: {
      backend:      @backend_type.to_s,
      transport:    @transport_type.to_s,
      bind_address: @bind_address_str,
      last_error:   @last_error
    },
    changefeed: cf_snap
  }
end

#protocolObject

Lazy Protocol::Interpreter for the envelope dispatch layer. Owns a fresh IgniterStore independent of the legacy fact log. HTTP and TCP adapters share this interpreter instance.



357
358
359
# File 'lib/igniter/store/store_server.rb', line 357

def protocol
  @protocol ||= Protocol::Interpreter.new(IgniterStore.new)
end

#ready?Boolean

True when the server is live and accepting traffic.

Returns:

  • (Boolean)


274
# File 'lib/igniter/store/store_server.rb', line 274

def ready?    = !@stopped && !@draining

#recent_eventsObject

Recent structured events from the bounded ring buffer. Returns an Array of event hashes (newest at end), size ≤ max_recent_events.



281
282
283
# File 'lib/igniter/store/store_server.rb', line 281

def recent_events
  @event_ring.to_a
end

#startObject

Starts the accept loop in the calling thread (blocks until #stop).



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/igniter/store/store_server.rb', line 131

def start
  @logger.info("Listening on #{@bind_address_str} " \
               "(transport=#{@transport_type} backend=#{@backend_type})")
  emit_event(:server_start,
             bind_address: @bind_address_str,
             transport:    @transport_type,
             backend:      @backend_type)
  until @stopped
    begin
      client = @server.accept
    rescue IOError, Errno::EBADF
      break
    end

    if @draining
      @metrics.record_connection_rejected
      client.close rescue nil
      next
    end

    active = @active_mutex.synchronize { @active += 1; @active }

    if @max_connections && active > @max_connections
      @active_mutex.synchronize { @active -= 1 }
      @metrics.record_connection_rejected
      emit_event(:alert, type: :max_connections, active: active, max: @max_connections)
      client.close rescue nil
      next
    end

    @logger.debug("Connection accepted (active=#{active})")
    Thread.new(client) { |s| handle_client(s) }
  end
ensure
  remove_pid_file
  @logger.info("Stopped.")
  emit_event(:server_stop, bind_address: @bind_address_str)
end

#start_asyncObject

Starts the accept loop in a background daemon thread. Call wait_until_ready after this to avoid race conditions.



172
173
174
175
176
177
# File 'lib/igniter/store/store_server.rb', line 172

def start_async
  Thread.new do
    Thread.current.abort_on_exception = false
    start
  end
end

#start_foregroundObject

Starts the accept loop with SIGTERM/SIGINT traps for CLI/foreground use. Blocks until a signal or #stop is called.



195
196
197
198
199
# File 'lib/igniter/store/store_server.rb', line 195

def start_foreground
  trap("SIGTERM") { stop }
  trap("INT")     { stop }
  start
end

#start_with_adapters(http_port: nil, tcp_port: nil) ⇒ Object

Starts the legacy accept loop plus optional HTTP/TCP envelope adapters, all in one foreground process. Adapters are stopped on exit.



363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
# File 'lib/igniter/store/store_server.rb', line 363

def start_with_adapters(http_port: nil, tcp_port: nil)
  http = http_port ? HTTPAdapter.new(
    interpreter:         protocol,
    port:                http_port,
    health_provider:     method(:health_snapshot),
    status_provider:     method(:observability_snapshot),
    ready_provider:      method(:ready?),
    metrics_provider:    -> { observability_snapshot[:metrics] },
    events_provider:     method(:recent_events),
    changefeed_provider: method(:changefeed)
  ) : nil
  tcp  = tcp_port  ? TCPAdapter.new(interpreter: protocol, port: tcp_port)  : nil
  http&.start_async
  tcp&.start_async
  start_foreground
ensure
  http&.stop
  tcp&.stop
end

#stop(timeout: nil) ⇒ Object

Gracefully stops the server.

  1. Closes the server socket (no new connections accepted).

  2. Waits up to timeout seconds for active connections to finish.

  3. Force-closes remaining connections and closes the backend.



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/igniter/store/store_server.rb', line 205

def stop(timeout: nil)
  t = timeout || @drain_timeout
  @stopped = true
  @logger.info("Stopping (drain_timeout=#{t}s)...")
  @server.close rescue nil
  remove_pid_file

  deadline = Time.now + t
  loop do
    active = @active_mutex.synchronize { @active }
    break if active.zero? || Time.now >= deadline
    @logger.debug("Draining #{active} connection(s)...")
    sleep 0.05
  end

  remaining = @active_mutex.synchronize { @active }
  @logger.warn("Force-closing #{remaining} connection(s).") if remaining.positive?
  @write_mutex.synchronize { @backend&.close rescue nil }
end

#subscription_count(store) ⇒ Object

Number of active push subscriptions for a given store name.



264
265
266
# File 'lib/igniter/store/store_server.rb', line 264

def subscription_count(store)
  @changefeed.subscriber_count(store)
end

#wait_until_ready(timeout: 2) ⇒ Object

Blocks until the server’s accept loop is running and ready for connections. Replaces the sleep 0.05 hack in callers.



181
182
183
184
185
186
187
188
189
190
191
# File 'lib/igniter/store/store_server.rb', line 181

def wait_until_ready(timeout: 2)
  @ready_mutex.synchronize do
    deadline = Time.now + timeout
    until @ready_latch
      remaining = deadline - Time.now
      raise "StoreServer did not become ready within #{timeout}s" if remaining <= 0
      @ready_cond.wait(@ready_mutex, remaining)
    end
  end
  self
end