Class: Igniter::Store::ChangefeedBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/igniter/store/changefeed_buffer.rb

Overview

Bounded in-memory Changefeed buffer with async per-subscriber fan-out, delivery policies, and production diagnostics.

Receives committed facts via emit, builds ChangeEvent objects with monotonic sequence cursors, retains recent events in a bounded ring, and fans out to registered subscriber handlers via per-subscriber bounded queues and worker threads so that slow subscribers never stall emit.

Delivery semantics: async best-effort push.

  • Fan-out enqueues to a per-subscriber SubscriberQueue; emit returns quickly.

  • Each subscriber has one worker thread draining its queue.

  • When a subscriber queue is full, overflow policy determines which event is dropped (see overflow: option).

  • A handler that raises is removed, counted as failed, and its worker exits.

  • When the ring is full the oldest retained event is dropped and dropped_total is incremented.

  • No durable checkpoints in this v0 slice.

Overflow policies (subscriber queue full):

  • :drop_oldest — remove the oldest queued event; add the incoming event.

  • :drop_newest — discard the incoming event; queue unchanged.

Close policies (Subscription#close):

  • :drain — deliver all queued events before stopping the worker.

  • :discard — clear the queue immediately; worker exits after current event.

Alert thresholds (optional, checked at each #snapshot call):

  • :failed_total — fires :changefeed_subscriber_failures

  • :overflow_dropped_total — fires :changefeed_overflow_drops

  • :total_queued — fires :changefeed_queue_pressure (aggregate)

  • :queue_pressure_ratio — fires :changefeed_queue_pressure (per-subscriber)

Diagnostics ring records bounded lifecycle/failure events:

  • :subscriber_subscribed / :subscriber_closed / :subscriber_failed

  • :subscriber_overflow

Ordering policy:

  • Sequences are assigned in emit-call order (monotonically increasing).

  • IgniterStore emits the source fact BEFORE triggering derivations/scatters, so subscribers always see cause before effects within their queue.

Replay cursor semantics (see #replay):

  • nil cursor → all retained events from oldest retained sequence.

  • N → events with sequence > N.

  • N < oldest-1 → :cursor_too_old (gap due to ring overflow).

  • N >= newest → empty :ok (caller is already at the head).

Usage:

buf    = ChangefeedBuffer.new(max_size: 1_000)
handle = buf.subscribe(stores: [:tasks]) { |event| deliver(event) }
buf.emit(fact)      # enqueues to matching subscriber queues; returns quickly
handle.close        # respects close_policy (drain or discard), joins worker

Defined Under Namespace

Classes: DiagnosticRing, SubscriberQueue, Subscription, SubscriptionRecord

Constant Summary collapse

DEFAULT_MAX_SIZE =
1_000
DEFAULT_SUBSCRIBER_QUEUE_SIZE =
100
DEFAULT_OVERFLOW =
:drop_oldest
DEFAULT_CLOSE_POLICY =
:drain
DEFAULT_DIAGNOSTIC_RING_SIZE =
100
VALID_OVERFLOW_POLICIES =
%i[drop_oldest drop_newest].freeze
VALID_CLOSE_POLICIES =
%i[drain discard].freeze
VALID_THRESHOLD_KEYS =
%i[total_queued overflow_dropped_total failed_total queue_pressure_ratio].freeze

Instance Method Summary collapse

Constructor Details

#initialize(max_size: DEFAULT_MAX_SIZE, subscriber_queue_size: DEFAULT_SUBSCRIBER_QUEUE_SIZE, overflow: DEFAULT_OVERFLOW, close_policy: DEFAULT_CLOSE_POLICY, diagnostic_ring_size: DEFAULT_DIAGNOSTIC_RING_SIZE, alert_thresholds: {}) ⇒ ChangefeedBuffer

Returns a new instance of ChangefeedBuffer.



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/igniter/store/changefeed_buffer.rb', line 189

def initialize(max_size: DEFAULT_MAX_SIZE,
               subscriber_queue_size: DEFAULT_SUBSCRIBER_QUEUE_SIZE,
               overflow: DEFAULT_OVERFLOW,
               close_policy: DEFAULT_CLOSE_POLICY,
               diagnostic_ring_size: DEFAULT_DIAGNOSTIC_RING_SIZE,
               alert_thresholds: {})
  unless VALID_OVERFLOW_POLICIES.include?(overflow)
    raise ArgumentError, "unknown overflow policy: #{overflow.inspect}. " \
                         "Valid: #{VALID_OVERFLOW_POLICIES.map(&:inspect).join(", ")}"
  end
  unless VALID_CLOSE_POLICIES.include?(close_policy)
    raise ArgumentError, "unknown close_policy: #{close_policy.inspect}. " \
                         "Valid: #{VALID_CLOSE_POLICIES.map(&:inspect).join(", ")}"
  end

  thresholds = (alert_thresholds || {}).transform_keys(&:to_sym)
  unknown    = thresholds.keys - VALID_THRESHOLD_KEYS
  unless unknown.empty?
    raise ArgumentError, "unknown alert_threshold keys: #{unknown.map(&:inspect).join(", ")}. " \
                         "Valid: #{VALID_THRESHOLD_KEYS.map(&:inspect).join(", ")}"
  end

  unless max_size.is_a?(Integer) && max_size > 0
    raise ArgumentError, "max_size must be a positive integer, got #{max_size.inspect}"
  end
  unless subscriber_queue_size.is_a?(Integer) && subscriber_queue_size > 0
    raise ArgumentError, "subscriber_queue_size must be a positive integer, got #{subscriber_queue_size.inspect}"
  end
  unless diagnostic_ring_size.is_a?(Integer) && diagnostic_ring_size > 0
    raise ArgumentError, "diagnostic_ring_size must be a positive integer, got #{diagnostic_ring_size.inspect}"
  end
  ratio = thresholds[:queue_pressure_ratio]
  if ratio && !(ratio.is_a?(Numeric) && ratio >= 0.0 && ratio <= 1.0)
    raise ArgumentError, "queue_pressure_ratio must be between 0.0 and 1.0, got #{ratio.inspect}"
  end

  @max_size               = max_size
  @subscriber_queue_size  = subscriber_queue_size
  @overflow               = overflow
  @close_policy           = close_policy
  @alert_thresholds       = thresholds
  @diagnostics            = DiagnosticRing.new(diagnostic_ring_size)
  @ring                   = []
  @records                = []
  @mutex                  = Mutex.new
  @sequence               = 0
  @emitted_total          = 0
  @delivered_total        = 0
  @dropped_total          = 0
  @overflow_dropped_total = 0
  @failed_total           = 0
end

Instance Method Details

#emit(fact) ⇒ Object

Build a ChangeEvent from fact, add to the ring buffer, and enqueue to matching subscriber queues. Returns the emitted ChangeEvent immediately.



308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/igniter/store/changefeed_buffer.rb', line 308

def emit(fact)
  event = @mutex.synchronize do
    @sequence += 1
    e = ChangeEvent.from_fact(fact, sequence: @sequence)
    @emitted_total += 1
    if @ring.size >= @max_size
      @ring.shift
      @dropped_total += 1
    end
    @ring << e
    e
  end

  fan_out(event)
  event
end

#replay(cursor: nil, stores: nil, limit: nil) ⇒ Object

Replay retained ChangeEvents from the in-memory ring.

cursor — nil or { sequence: Integer } stores — nil (all) or Array of store name symbols/strings to filter limit — nil (all matching) or Integer cap on returned events

Returns a Hash:

{
  status:        :ok | :cursor_too_old,
  events:        [ChangeEvent, ...],
  cursor:        { sequence: N } | nil,
  oldest_cursor: { sequence: N } | nil,
  newest_cursor: { sequence: N } | nil,
  dropped_total: Integer
}


352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
# File 'lib/igniter/store/changefeed_buffer.rb', line 352

def replay(cursor: nil, stores: nil, limit: nil)
  @mutex.synchronize do
    if @ring.empty?
      return {
        status:        :ok,
        events:        [],
        cursor:        nil,
        oldest_cursor: nil,
        newest_cursor: nil,
        dropped_total: @dropped_total
      }
    end

    oldest_seq = @ring.first.cursor[:sequence]
    newest_seq = @ring.last.cursor[:sequence]

    candidates =
      if cursor.nil?
        @ring.dup
      else
        req_seq = Integer(cursor[:sequence])

        if req_seq < oldest_seq - 1
          return {
            status:        :cursor_too_old,
            events:        [],
            cursor:        { sequence: newest_seq },
            oldest_cursor: { sequence: oldest_seq },
            newest_cursor: { sequence: newest_seq },
            dropped_total: @dropped_total
          }
        end

        if req_seq >= newest_seq
          return {
            status:        :ok,
            events:        [],
            cursor:        { sequence: newest_seq },
            oldest_cursor: { sequence: oldest_seq },
            newest_cursor: { sequence: newest_seq },
            dropped_total: @dropped_total
          }
        end

        @ring.select { |e| e.cursor[:sequence] > req_seq }
      end

    if stores && !stores.empty?
      store_strs = Array(stores).map(&:to_s)
      candidates = candidates.select { |e| store_strs.include?(e.store.to_s) }
    end

    candidates = candidates.first(limit) if limit

    result_cursor =
      if candidates.last
        { sequence: candidates.last.cursor[:sequence] }
      else
        { sequence: newest_seq }
      end

    {
      status:        :ok,
      events:        candidates,
      cursor:        result_cursor,
      oldest_cursor: { sequence: oldest_seq },
      newest_cursor: { sequence: newest_seq },
      dropped_total: @dropped_total
    }
  end
end

#snapshotObject

Compact snapshot of current changefeed state for observability. Includes alerts (evaluated against configured thresholds) and diagnostics (recent bounded ring of lifecycle/failure entries).

dropped_total — retained ring drops (ring full) overflow_dropped_total — subscriber queue drops (slow consumer) total_queued — sum of all active subscriber queue sizes (backpressure)



431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
# File 'lib/igniter/store/changefeed_buffer.rb', line 431

def snapshot
  @mutex.synchronize do
    total_queued = @records.sum { |r| r.queue.size }
    current = {
      emitted_total:          @emitted_total,
      delivered_total:        @delivered_total,
      dropped_total:          @dropped_total,
      overflow_dropped_total: @overflow_dropped_total,
      failed_total:           @failed_total,
      buffered:               @ring.size,
      max_size:               @max_size,
      subscriber_count:       @records.size,
      subscriber_queue_size:  @subscriber_queue_size,
      overflow:               @overflow,
      close_policy:           @close_policy,
      total_queued:           total_queued,
      oldest_sequence:        @ring.first&.cursor&.fetch(:sequence, nil),
      newest_sequence:        @ring.last&.cursor&.fetch(:sequence, nil)
    }
    current[:alerts]      = compute_alerts(current)
    current[:diagnostics] = @diagnostics.snapshot
    current
  end
end

#subscribe(stores:, &handler) ⇒ Object

Register a subscriber handler for one or more store names. stores: — Array of store name symbols/strings, or [] for all stores (wildcard). Returns a Subscription handle; call handle.close to unsubscribe.

Raises:

  • (ArgumentError)


245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/igniter/store/changefeed_buffer.rb', line 245

def subscribe(stores:, &handler)
  raise ArgumentError, "subscribe requires a block" unless handler

  q = SubscriberQueue.new(max_size: @subscriber_queue_size, overflow: @overflow)
  record = SubscriptionRecord.new(
    id:                     SecureRandom.hex(8),
    stores:                 Array(stores).map(&:to_s),
    handler:                handler,
    queue:                  q,
    thread:                 nil,
    overflow:               @overflow,
    close_policy:           @close_policy,
    delivered_total:        0,
    overflow_dropped_total: 0,
    failed_total:           0,
    status:                 :active
  )

  thread = Thread.new do
    loop do
      event = q.pop
      break if event.nil?
      begin
        handler.call(event)
        @mutex.synchronize do
          @delivered_total += 1
          record.delivered_total += 1
        end
      rescue StandardError => e
        ts = Process.clock_gettime(Process::CLOCK_REALTIME)
        @mutex.synchronize do
          @failed_total += 1
          record.failed_total += 1
          record.status = :failed
        end
        @diagnostics.push({
          type:          :subscriber_failed,
          subscriber_id: record.id,
          stores:        record.stores,
          error_class:   e.class.name,
          message:       e.message.to_s.slice(0, 200),
          ts:            ts
        })
        remove_record(record, record_diagnostic: false)
        break
      end
    end
  end
  record.thread = thread

  @mutex.synchronize { @records << record }
  @diagnostics.push({
    type:          :subscriber_subscribed,
    subscriber_id: record.id,
    stores:        record.stores,
    ts:            Process.clock_gettime(Process::CLOCK_REALTIME)
  })

  Subscription.new(record, self)
end

#subscriber_count(store = nil) ⇒ Object

Number of active subscribers, optionally filtered by store name. Wildcard subscribers (stores == []) are counted for every store.



327
328
329
330
331
332
333
334
335
# File 'lib/igniter/store/changefeed_buffer.rb', line 327

def subscriber_count(store = nil)
  @mutex.synchronize do
    if store
      @records.count { |r| r.stores.empty? || r.stores.include?(store.to_s) }
    else
      @records.size
    end
  end
end

#subscriber_snapshotObject

Per-subscriber state snapshot for diagnosing slow/failing consumers.

Returns an Array of Hashes — one per active subscriber — with fields:

id, stores, queue_size, queue_max_size, overflow, close_policy,
status, delivered_total, overflow_dropped_total, failed_total

Subscribers that have already failed or been closed are not listed.



463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
# File 'lib/igniter/store/changefeed_buffer.rb', line 463

def subscriber_snapshot
  @mutex.synchronize do
    @records.map do |r|
      {
        id:                     r.id,
        stores:                 r.stores,
        queue_size:             r.queue.size,
        queue_max_size:         @subscriber_queue_size,
        overflow:               r.overflow,
        close_policy:           r.close_policy,
        status:                 r.status,
        delivered_total:        r.delivered_total,
        overflow_dropped_total: r.overflow_dropped_total,
        failed_total:           r.failed_total
      }
    end
  end
end