Class: Igniter::Store::SubscriptionRegistry
- Inherits:
-
Object
- Object
- Igniter::Store::SubscriptionRegistry
- Defined in:
- lib/igniter/store/subscription_registry.rb
Overview
Routing layer of the reactive push architecture.
Tracks SubscriptionRecord objects and fans out facts to their handlers. Knows nothing about sockets, frames, or wire encoding — those are the adapter’s responsibility. The handler is any callable: ->(fact) { … }
TCP push adapter example (created in handle_subscription_mode):
write_mutex = Mutex.new
adapter = ->(fact) {
frame = encode_frame(JSON.generate({ event: "fact_written", fact: fact.to_h }))
write_mutex.synchronize { socket.write(frame) }
}
record = registry.subscribe(stores: [:tasks], &adapter)
# ... later:
registry.unsubscribe(record)
Future adapters (WebhookAdapter, SSEAdapter, QueueAdapter) follow the same ->(fact) { … } contract and plug in without modifying this class.
Defined Under Namespace
Classes: SubscriptionRecord
Instance Method Summary collapse
-
#fan_out(fact) ⇒ Object
Fan out a fact to all handlers subscribed to fact.store.
-
#initialize ⇒ SubscriptionRegistry
constructor
A new instance of SubscriptionRegistry.
-
#subscribe(stores:, &handler) ⇒ Object
Register a handler callable for one or more store names.
-
#subscriber_count(store) ⇒ Object
Number of active subscriptions for a given store name.
-
#unsubscribe(record) ⇒ Object
Remove a subscription.
Constructor Details
#initialize ⇒ SubscriptionRegistry
Returns a new instance of SubscriptionRegistry.
29 30 31 32 |
# File 'lib/igniter/store/subscription_registry.rb', line 29 def initialize @records = [] @mutex = Mutex.new end |
Instance Method Details
#fan_out(fact) ⇒ Object
Fan out a fact to all handlers subscribed to fact.store. Called from dispatch(“write_fact”) after the fact is persisted. Handlers that raise are treated as dead and removed.
55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/igniter/store/subscription_registry.rb', line 55 def fan_out(fact) store_s = fact.store.to_s matching = @mutex.synchronize { @records.select { |r| r.stores.include?(store_s) }.dup } dead = [] matching.each do |record| record.handler.call(fact) rescue StandardError dead << record end dead.each { |r| unsubscribe(r) } unless dead.empty? end |
#subscribe(stores:, &handler) ⇒ Object
Register a handler callable for one or more store names. Returns the SubscriptionRecord — pass it to #unsubscribe to remove.
36 37 38 39 40 41 42 43 44 |
# File 'lib/igniter/store/subscription_registry.rb', line 36 def subscribe(stores:, &handler) record = SubscriptionRecord.new( id: SecureRandom.hex(8), stores: Array(stores).map(&:to_s), handler: handler ) @mutex.synchronize { @records << record } record end |
#subscriber_count(store) ⇒ Object
Number of active subscriptions for a given store name.
68 69 70 |
# File 'lib/igniter/store/subscription_registry.rb', line 68 def subscriber_count(store) @mutex.synchronize { @records.count { |r| r.stores.include?(store.to_s) } } end |
#unsubscribe(record) ⇒ Object
Remove a subscription. Identity-based (object equality), idempotent.
47 48 49 50 |
# File 'lib/igniter/store/subscription_registry.rb', line 47 def unsubscribe(record) return unless record @mutex.synchronize { @records.reject! { |r| r.equal?(record) } } end |