Class: Takagi::Observer::Registry
- Inherits:
-
Object
- Object
- Takagi::Observer::Registry
- Defined in:
- lib/takagi/observer/registry.rb
Overview
Keeps track of observers and broadcasts state changes to interested parties.
NOTE: This registry intentionally does NOT use Registry::Base because:
-
It stores arrays of subscriptions per path (not simple key-value pairs)
-
It has complex domain logic (delta checking, notifications, cleanup)
-
It needs fine-grained mutex control for notify operations
-
The API is fundamentally different (subscribe/notify vs register/get)
This is already thread-safe with its own @mutex implementation.
Class Attribute Summary collapse
-
.subscriptions ⇒ Object
readonly
Returns the value of attribute subscriptions.
Class Method Summary collapse
- .cleanup_stale_observers(max_age:, now: Time.now) ⇒ Object
- .notify(path, new_value) ⇒ Object
- .sender ⇒ Object
- .subscribe(path, subscriber) ⇒ Object
- .subscription_paths ⇒ Object
- .unsubscribe(path, token) ⇒ Object
Class Attribute Details
.subscriptions ⇒ Object (readonly)
Returns the value of attribute subscriptions.
21 22 23 |
# File 'lib/takagi/observer/registry.rb', line 21 def subscriptions @subscriptions end |
Class Method Details
.cleanup_stale_observers(max_age:, now: Time.now) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/takagi/observer/registry.rb', line 79 def cleanup_stale_observers(max_age:, now: Time.now) cutoff = now - max_age cleaned = 0 @mutex.synchronize do @subscriptions.each do |path, subscribers| subscribers.reject! do |subscription| stale = stale_subscription?(subscription, cutoff) cleaned += 1 if stale stale end @subscriptions.delete(path) if subscribers.empty? end end cleaned end |
.notify(path, new_value) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/takagi/observer/registry.rb', line 48 def notify(path, new_value) # Get a snapshot of subscribers to avoid holding the lock during notification subscribers = @mutex.synchronize { @subscriptions[path]&.dup } return unless subscribers Takagi.logger.debug "Notify called for: #{path}" Takagi.logger.debug "Subscriptions count: #{subscribers.size}" Takagi::Hooks.emit(:observe_notify_start, path: path, subscribers: subscribers&.size || 0, value: new_value) subscribers.each do |subscription| next unless should_notify?(subscription, new_value) deliver_notification(subscription, path, new_value) update_sequence(subscription, new_value) subscription[:last_notified_at] = Time.now Takagi::Hooks.emit(:observer_notification, path: path, subscription: subscription) end Takagi::Hooks.emit(:observe_notify_end, path: path, delivered: subscribers&.size || 0, value: new_value) end |
.sender ⇒ Object
71 72 73 |
# File 'lib/takagi/observer/registry.rb', line 71 def sender @sender ||= Takagi::Observer::Sender.new end |
.subscribe(path, subscriber) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/takagi/observer/registry.rb', line 23 def subscribe(path, subscriber) entry = subscriber.dup entry[:created_at] ||= Time.now entry[:last_notified_at] ||= nil @mutex.synchronize do @subscriptions[path] ||= [] @subscriptions[path] << entry end Takagi::Hooks.emit(:observe_subscribed, path: path, subscription: entry) entry end |
.subscription_paths ⇒ Object
75 76 77 |
# File 'lib/takagi/observer/registry.rb', line 75 def subscription_paths @mutex.synchronize { @subscriptions.keys.dup } end |
.unsubscribe(path, token) ⇒ Object
38 39 40 41 42 43 44 45 46 |
# File 'lib/takagi/observer/registry.rb', line 38 def unsubscribe(path, token) @mutex.synchronize do return unless @subscriptions[path] @subscriptions[path].reject! { |s| s[:token] == token } end Takagi::Hooks.emit(:observe_unsubscribed, path: path, token: token) end |