Class: DSPy::EventRegistry
- Inherits:
-
Object
- Object
- DSPy::EventRegistry
- Defined in:
- lib/dspy/events.rb
Instance Method Summary collapse
- #clear_listeners ⇒ Object
-
#initialize ⇒ EventRegistry
constructor
A new instance of EventRegistry.
- #notify(event_name, attributes) ⇒ Object
- #subscribe(pattern, &block) ⇒ Object
- #unsubscribe(subscription_id) ⇒ Object
Constructor Details
#initialize ⇒ EventRegistry
Returns a new instance of EventRegistry.
12 13 14 15 |
# File 'lib/dspy/events.rb', line 12 def initialize @listeners = {} @mutex = Mutex.new end |
Instance Method Details
#clear_listeners ⇒ Object
37 38 39 40 41 |
# File 'lib/dspy/events.rb', line 37 def clear_listeners @mutex.synchronize do @listeners.clear end end |
#notify(event_name, attributes) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/dspy/events.rb', line 43 def notify(event_name, attributes) # Take a snapshot of current listeners to avoid holding the mutex during execution # This allows listeners to be modified while others are executing matching_listeners = @mutex.synchronize do @listeners.select do |id, listener| pattern_matches?(listener[:pattern], event_name) end.dup # Create a copy to avoid shared state end matching_listeners.each do |id, listener| begin listener[:block].call(event_name, attributes) rescue StandardError => e # Log the error but continue processing other listeners # Use emit_log directly to avoid infinite recursion DSPy.send(:emit_log, 'event.listener.error', { subscription_id: id, error_class: e.class.name, error_message: e., event_name: event_name }) end end end |
#subscribe(pattern, &block) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/dspy/events.rb', line 17 def subscribe(pattern, &block) return unless block_given? subscription_id = SecureRandom.uuid @mutex.synchronize do @listeners[subscription_id] = { pattern: pattern, block: block } end subscription_id end |
#unsubscribe(subscription_id) ⇒ Object
31 32 33 34 35 |
# File 'lib/dspy/events.rb', line 31 def unsubscribe(subscription_id) @mutex.synchronize do @listeners.delete(subscription_id) end end |