Class: Igniter::Store::ChangefeedBuffer
- Inherits:
-
Object
- Object
- Igniter::Store::ChangefeedBuffer
- Defined in:
- lib/igniter/store/changefeed_buffer.rb
Overview
Bounded in-memory Changefeed buffer with async per-subscriber fan-out, delivery policies, and production diagnostics.
Receives committed facts via emit, builds ChangeEvent objects with monotonic sequence cursors, retains recent events in a bounded ring, and fans out to registered subscriber handlers via per-subscriber bounded queues and worker threads so that slow subscribers never stall emit.
Delivery semantics: async best-effort push.
-
Fan-out enqueues to a per-subscriber SubscriberQueue; emit returns quickly.
-
Each subscriber has one worker thread draining its queue.
-
When a subscriber queue is full, overflow policy determines which event is dropped (see
overflow:option). -
A handler that raises is removed, counted as failed, and its worker exits.
-
When the ring is full the oldest retained event is dropped and
dropped_totalis incremented. -
No durable checkpoints in this v0 slice.
Overflow policies (subscriber queue full):
-
:drop_oldest— remove the oldest queued event; add the incoming event. -
:drop_newest— discard the incoming event; queue unchanged.
Close policies (Subscription#close):
-
:drain— deliver all queued events before stopping the worker. -
:discard— clear the queue immediately; worker exits after current event.
Alert thresholds (optional, checked at each #snapshot call):
-
:failed_total— fires :changefeed_subscriber_failures -
:overflow_dropped_total— fires :changefeed_overflow_drops -
:total_queued— fires :changefeed_queue_pressure (aggregate) -
:queue_pressure_ratio— fires :changefeed_queue_pressure (per-subscriber)
Diagnostics ring records bounded lifecycle/failure events:
-
:subscriber_subscribed / :subscriber_closed / :subscriber_failed
-
:subscriber_overflow
Ordering policy:
-
Sequences are assigned in emit-call order (monotonically increasing).
-
IgniterStore emits the source fact BEFORE triggering derivations/scatters, so subscribers always see cause before effects within their queue.
Replay cursor semantics (see #replay):
-
nil cursor → all retained events from oldest retained sequence.
-
N → events with sequence > N.
-
N < oldest-1 → :cursor_too_old (gap due to ring overflow).
-
N >= newest → empty :ok (caller is already at the head).
Usage:
buf = ChangefeedBuffer.new(max_size: 1_000)
handle = buf.subscribe(stores: [:tasks]) { |event| deliver(event) }
buf.emit(fact) # enqueues to matching subscriber queues; returns quickly
handle.close # respects close_policy (drain or discard), joins worker
Defined Under Namespace
Classes: DiagnosticRing, SubscriberQueue, Subscription, SubscriptionRecord
Constant Summary collapse
- DEFAULT_MAX_SIZE =
1_000- DEFAULT_SUBSCRIBER_QUEUE_SIZE =
100- DEFAULT_OVERFLOW =
:drop_oldest- DEFAULT_CLOSE_POLICY =
:drain- DEFAULT_DIAGNOSTIC_RING_SIZE =
100- VALID_OVERFLOW_POLICIES =
%i[drop_oldest drop_newest].freeze
- VALID_CLOSE_POLICIES =
%i[drain discard].freeze
- VALID_THRESHOLD_KEYS =
%i[total_queued overflow_dropped_total failed_total queue_pressure_ratio].freeze
Instance Method Summary collapse
-
#emit(fact) ⇒ Object
Build a ChangeEvent from
fact, add to the ring buffer, and enqueue to matching subscriber queues. -
#initialize(max_size: DEFAULT_MAX_SIZE, subscriber_queue_size: DEFAULT_SUBSCRIBER_QUEUE_SIZE, overflow: DEFAULT_OVERFLOW, close_policy: DEFAULT_CLOSE_POLICY, diagnostic_ring_size: DEFAULT_DIAGNOSTIC_RING_SIZE, alert_thresholds: {}) ⇒ ChangefeedBuffer
constructor
A new instance of ChangefeedBuffer.
-
#replay(cursor: nil, stores: nil, limit: nil) ⇒ Object
Replay retained ChangeEvents from the in-memory ring.
-
#snapshot ⇒ Object
Compact snapshot of current changefeed state for observability.
-
#subscribe(stores:, &handler) ⇒ Object
Register a subscriber handler for one or more store names.
-
#subscriber_count(store = nil) ⇒ Object
Number of active subscribers, optionally filtered by store name.
-
#subscriber_snapshot ⇒ Object
Per-subscriber state snapshot for diagnosing slow/failing consumers.
Constructor Details
#initialize(max_size: DEFAULT_MAX_SIZE, subscriber_queue_size: DEFAULT_SUBSCRIBER_QUEUE_SIZE, overflow: DEFAULT_OVERFLOW, close_policy: DEFAULT_CLOSE_POLICY, diagnostic_ring_size: DEFAULT_DIAGNOSTIC_RING_SIZE, alert_thresholds: {}) ⇒ ChangefeedBuffer
Returns a new instance of ChangefeedBuffer.
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/igniter/store/changefeed_buffer.rb', line 189 def initialize(max_size: DEFAULT_MAX_SIZE, subscriber_queue_size: DEFAULT_SUBSCRIBER_QUEUE_SIZE, overflow: DEFAULT_OVERFLOW, close_policy: DEFAULT_CLOSE_POLICY, diagnostic_ring_size: DEFAULT_DIAGNOSTIC_RING_SIZE, alert_thresholds: {}) unless VALID_OVERFLOW_POLICIES.include?(overflow) raise ArgumentError, "unknown overflow policy: #{overflow.inspect}. " \ "Valid: #{VALID_OVERFLOW_POLICIES.map(&:inspect).join(", ")}" end unless VALID_CLOSE_POLICIES.include?(close_policy) raise ArgumentError, "unknown close_policy: #{close_policy.inspect}. " \ "Valid: #{VALID_CLOSE_POLICIES.map(&:inspect).join(", ")}" end thresholds = (alert_thresholds || {}).transform_keys(&:to_sym) unknown = thresholds.keys - VALID_THRESHOLD_KEYS unless unknown.empty? raise ArgumentError, "unknown alert_threshold keys: #{unknown.map(&:inspect).join(", ")}. " \ "Valid: #{VALID_THRESHOLD_KEYS.map(&:inspect).join(", ")}" end unless max_size.is_a?(Integer) && max_size > 0 raise ArgumentError, "max_size must be a positive integer, got #{max_size.inspect}" end unless subscriber_queue_size.is_a?(Integer) && subscriber_queue_size > 0 raise ArgumentError, "subscriber_queue_size must be a positive integer, got #{subscriber_queue_size.inspect}" end unless diagnostic_ring_size.is_a?(Integer) && diagnostic_ring_size > 0 raise ArgumentError, "diagnostic_ring_size must be a positive integer, got #{diagnostic_ring_size.inspect}" end ratio = thresholds[:queue_pressure_ratio] if ratio && !(ratio.is_a?(Numeric) && ratio >= 0.0 && ratio <= 1.0) raise ArgumentError, "queue_pressure_ratio must be between 0.0 and 1.0, got #{ratio.inspect}" end @max_size = max_size @subscriber_queue_size = subscriber_queue_size @overflow = overflow @close_policy = close_policy @alert_thresholds = thresholds @diagnostics = DiagnosticRing.new(diagnostic_ring_size) @ring = [] @records = [] @mutex = Mutex.new @sequence = 0 @emitted_total = 0 @delivered_total = 0 @dropped_total = 0 @overflow_dropped_total = 0 @failed_total = 0 end |
Instance Method Details
#emit(fact) ⇒ Object
Build a ChangeEvent from fact, add to the ring buffer, and enqueue to matching subscriber queues. Returns the emitted ChangeEvent immediately.
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 |
# File 'lib/igniter/store/changefeed_buffer.rb', line 308 def emit(fact) event = @mutex.synchronize do @sequence += 1 e = ChangeEvent.from_fact(fact, sequence: @sequence) @emitted_total += 1 if @ring.size >= @max_size @ring.shift @dropped_total += 1 end @ring << e e end fan_out(event) event end |
#replay(cursor: nil, stores: nil, limit: nil) ⇒ Object
Replay retained ChangeEvents from the in-memory ring.
cursor — nil or { sequence: Integer } stores — nil (all) or Array of store name symbols/strings to filter limit — nil (all matching) or Integer cap on returned events
Returns a Hash:
{
status: :ok | :cursor_too_old,
events: [ChangeEvent, ...],
cursor: { sequence: N } | nil,
oldest_cursor: { sequence: N } | nil,
newest_cursor: { sequence: N } | nil,
dropped_total: Integer
}
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 |
# File 'lib/igniter/store/changefeed_buffer.rb', line 352 def replay(cursor: nil, stores: nil, limit: nil) @mutex.synchronize do if @ring.empty? return { status: :ok, events: [], cursor: nil, oldest_cursor: nil, newest_cursor: nil, dropped_total: @dropped_total } end oldest_seq = @ring.first.cursor[:sequence] newest_seq = @ring.last.cursor[:sequence] candidates = if cursor.nil? @ring.dup else req_seq = Integer(cursor[:sequence]) if req_seq < oldest_seq - 1 return { status: :cursor_too_old, events: [], cursor: { sequence: newest_seq }, oldest_cursor: { sequence: oldest_seq }, newest_cursor: { sequence: newest_seq }, dropped_total: @dropped_total } end if req_seq >= newest_seq return { status: :ok, events: [], cursor: { sequence: newest_seq }, oldest_cursor: { sequence: oldest_seq }, newest_cursor: { sequence: newest_seq }, dropped_total: @dropped_total } end @ring.select { |e| e.cursor[:sequence] > req_seq } end if stores && !stores.empty? store_strs = Array(stores).map(&:to_s) candidates = candidates.select { |e| store_strs.include?(e.store.to_s) } end candidates = candidates.first(limit) if limit result_cursor = if candidates.last { sequence: candidates.last.cursor[:sequence] } else { sequence: newest_seq } end { status: :ok, events: candidates, cursor: result_cursor, oldest_cursor: { sequence: oldest_seq }, newest_cursor: { sequence: newest_seq }, dropped_total: @dropped_total } end end |
#snapshot ⇒ Object
Compact snapshot of current changefeed state for observability. Includes alerts (evaluated against configured thresholds) and diagnostics (recent bounded ring of lifecycle/failure entries).
dropped_total — retained ring drops (ring full) overflow_dropped_total — subscriber queue drops (slow consumer) total_queued — sum of all active subscriber queue sizes (backpressure)
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 |
# File 'lib/igniter/store/changefeed_buffer.rb', line 431 def snapshot @mutex.synchronize do total_queued = @records.sum { |r| r.queue.size } current = { emitted_total: @emitted_total, delivered_total: @delivered_total, dropped_total: @dropped_total, overflow_dropped_total: @overflow_dropped_total, failed_total: @failed_total, buffered: @ring.size, max_size: @max_size, subscriber_count: @records.size, subscriber_queue_size: @subscriber_queue_size, overflow: @overflow, close_policy: @close_policy, total_queued: total_queued, oldest_sequence: @ring.first&.cursor&.fetch(:sequence, nil), newest_sequence: @ring.last&.cursor&.fetch(:sequence, nil) } current[:alerts] = compute_alerts(current) current[:diagnostics] = @diagnostics.snapshot current end end |
#subscribe(stores:, &handler) ⇒ Object
Register a subscriber handler for one or more store names. stores: — Array of store name symbols/strings, or [] for all stores (wildcard). Returns a Subscription handle; call handle.close to unsubscribe.
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/igniter/store/changefeed_buffer.rb', line 245 def subscribe(stores:, &handler) raise ArgumentError, "subscribe requires a block" unless handler q = SubscriberQueue.new(max_size: @subscriber_queue_size, overflow: @overflow) record = SubscriptionRecord.new( id: SecureRandom.hex(8), stores: Array(stores).map(&:to_s), handler: handler, queue: q, thread: nil, overflow: @overflow, close_policy: @close_policy, delivered_total: 0, overflow_dropped_total: 0, failed_total: 0, status: :active ) thread = Thread.new do loop do event = q.pop break if event.nil? begin handler.call(event) @mutex.synchronize do @delivered_total += 1 record.delivered_total += 1 end rescue StandardError => e ts = Process.clock_gettime(Process::CLOCK_REALTIME) @mutex.synchronize do @failed_total += 1 record.failed_total += 1 record.status = :failed end @diagnostics.push({ type: :subscriber_failed, subscriber_id: record.id, stores: record.stores, error_class: e.class.name, message: e..to_s.slice(0, 200), ts: ts }) remove_record(record, record_diagnostic: false) break end end end record.thread = thread @mutex.synchronize { @records << record } @diagnostics.push({ type: :subscriber_subscribed, subscriber_id: record.id, stores: record.stores, ts: Process.clock_gettime(Process::CLOCK_REALTIME) }) Subscription.new(record, self) end |
#subscriber_count(store = nil) ⇒ Object
Number of active subscribers, optionally filtered by store name. Wildcard subscribers (stores == []) are counted for every store.
327 328 329 330 331 332 333 334 335 |
# File 'lib/igniter/store/changefeed_buffer.rb', line 327 def subscriber_count(store = nil) @mutex.synchronize do if store @records.count { |r| r.stores.empty? || r.stores.include?(store.to_s) } else @records.size end end end |
#subscriber_snapshot ⇒ Object
Per-subscriber state snapshot for diagnosing slow/failing consumers.
Returns an Array of Hashes — one per active subscriber — with fields:
id, stores, queue_size, queue_max_size, overflow, close_policy,
status, delivered_total, overflow_dropped_total, failed_total
Subscribers that have already failed or been closed are not listed.
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 |
# File 'lib/igniter/store/changefeed_buffer.rb', line 463 def subscriber_snapshot @mutex.synchronize do @records.map do |r| { id: r.id, stores: r.stores, queue_size: r.queue.size, queue_max_size: @subscriber_queue_size, overflow: r.overflow, close_policy: r.close_policy, status: r.status, delivered_total: r.delivered_total, overflow_dropped_total: r.overflow_dropped_total, failed_total: r.failed_total } end end end |