Class: Igniter::Store::SubscriptionRegistry

Inherits:
Object
  • Object
show all
Defined in:
lib/igniter/store/subscription_registry.rb

Overview

Routing layer of the reactive push architecture.

Tracks SubscriptionRecord objects and fans out facts to their handlers. Knows nothing about sockets, frames, or wire encoding — those are the adapter’s responsibility. The handler is any callable: ->(fact) { … }

TCP push adapter example (created in handle_subscription_mode):

write_mutex = Mutex.new
adapter = ->(fact) {
  frame = encode_frame(JSON.generate({ event: "fact_written", fact: fact.to_h }))
  write_mutex.synchronize { socket.write(frame) }
}
record = registry.subscribe(stores: [:tasks], &adapter)
# ... later:
registry.unsubscribe(record)

Future adapters (WebhookAdapter, SSEAdapter, QueueAdapter) follow the same ->(fact) { … } contract and plug in without modifying this class.

Defined Under Namespace

Classes: SubscriptionRecord

Instance Method Summary collapse

Constructor Details

#initializeSubscriptionRegistry

Returns a new instance of SubscriptionRegistry.



29
30
31
32
# File 'lib/igniter/store/subscription_registry.rb', line 29

def initialize
  @records = []
  @mutex   = Mutex.new
end

Instance Method Details

#fan_out(fact) ⇒ Object

Fan out a fact to all handlers subscribed to fact.store. Called from dispatch(“write_fact”) after the fact is persisted. Handlers that raise are treated as dead and removed.



55
56
57
58
59
60
61
62
63
64
65
# File 'lib/igniter/store/subscription_registry.rb', line 55

def fan_out(fact)
  store_s  = fact.store.to_s
  matching = @mutex.synchronize { @records.select { |r| r.stores.include?(store_s) }.dup }
  dead     = []
  matching.each do |record|
    record.handler.call(fact)
  rescue StandardError
    dead << record
  end
  dead.each { |r| unsubscribe(r) } unless dead.empty?
end

#subscribe(stores:, &handler) ⇒ Object

Register a handler callable for one or more store names. Returns the SubscriptionRecord — pass it to #unsubscribe to remove.



36
37
38
39
40
41
42
43
44
# File 'lib/igniter/store/subscription_registry.rb', line 36

def subscribe(stores:, &handler)
  record = SubscriptionRecord.new(
    id:      SecureRandom.hex(8),
    stores:  Array(stores).map(&:to_s),
    handler: handler
  )
  @mutex.synchronize { @records << record }
  record
end

#subscriber_count(store) ⇒ Object

Number of active subscriptions for a given store name.



68
69
70
# File 'lib/igniter/store/subscription_registry.rb', line 68

def subscriber_count(store)
  @mutex.synchronize { @records.count { |r| r.stores.include?(store.to_s) } }
end

#unsubscribe(record) ⇒ Object

Remove a subscription. Identity-based (object equality), idempotent.



47
48
49
50
# File 'lib/igniter/store/subscription_registry.rb', line 47

def unsubscribe(record)
  return unless record
  @mutex.synchronize { @records.reject! { |r| r.equal?(record) } }
end