Class: Takagi::Observer::Registry

Inherits:
Object
  • Object
show all
Defined in:
lib/takagi/observer/registry.rb

Overview

Keeps track of observers and broadcasts state changes to interested parties.

NOTE: This registry intentionally does NOT use Registry::Base because:

  1. It stores arrays of subscriptions per path (not simple key-value pairs)

  2. It has complex domain logic (delta checking, notifications, cleanup)

  3. It needs fine-grained mutex control for notify operations

  4. The API is fundamentally different (subscribe/notify vs register/get)

This is already thread-safe with its own @mutex implementation.

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.subscriptionsObject (readonly)

Returns the value of attribute subscriptions.



21
22
23
# File 'lib/takagi/observer/registry.rb', line 21

def subscriptions
  @subscriptions
end

Class Method Details

.cleanup_stale_observers(max_age:, now: Time.now) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/takagi/observer/registry.rb', line 79

def cleanup_stale_observers(max_age:, now: Time.now)
  cutoff = now - max_age
  cleaned = 0

  @mutex.synchronize do
    @subscriptions.each do |path, subscribers|
      subscribers.reject! do |subscription|
        stale = stale_subscription?(subscription, cutoff)
        cleaned += 1 if stale
        stale
      end
      @subscriptions.delete(path) if subscribers.empty?
    end
  end

  cleaned
end

.notify(path, new_value) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/takagi/observer/registry.rb', line 48

def notify(path, new_value)
  # Get a snapshot of subscribers to avoid holding the lock during notification
  subscribers = @mutex.synchronize { @subscriptions[path]&.dup }
  return unless subscribers

  Takagi.logger.debug "Notify called for: #{path}"
  Takagi.logger.debug "Subscriptions count: #{subscribers.size}"

  Takagi::Hooks.emit(:observe_notify_start, path: path, subscribers: subscribers&.size || 0, value: new_value)

  subscribers.each do |subscription|
    next unless should_notify?(subscription, new_value)

    deliver_notification(subscription, path, new_value)
    update_sequence(subscription, new_value)
    subscription[:last_notified_at] = Time.now

    Takagi::Hooks.emit(:observer_notification, path: path, subscription: subscription)
  end

  Takagi::Hooks.emit(:observe_notify_end, path: path, delivered: subscribers&.size || 0, value: new_value)
end

.senderObject



71
72
73
# File 'lib/takagi/observer/registry.rb', line 71

def sender
  @sender ||= Takagi::Observer::Sender.new
end

.subscribe(path, subscriber) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/takagi/observer/registry.rb', line 23

def subscribe(path, subscriber)
  entry = subscriber.dup
  entry[:created_at] ||= Time.now
  entry[:last_notified_at] ||= nil

  @mutex.synchronize do
    @subscriptions[path] ||= []
    @subscriptions[path] << entry
  end

  Takagi::Hooks.emit(:observe_subscribed, path: path, subscription: entry)

  entry
end

.subscription_pathsObject



75
76
77
# File 'lib/takagi/observer/registry.rb', line 75

def subscription_paths
  @mutex.synchronize { @subscriptions.keys.dup }
end

.unsubscribe(path, token) ⇒ Object



38
39
40
41
42
43
44
45
46
# File 'lib/takagi/observer/registry.rb', line 38

def unsubscribe(path, token)
  @mutex.synchronize do
    return unless @subscriptions[path]

    @subscriptions[path].reject! { |s| s[:token] == token }
  end

  Takagi::Hooks.emit(:observe_unsubscribed, path: path, token: token)
end