Class: Philiprehberger::EventEmitter::Emitter

Inherits:
Object
  • Object
show all
Includes:
History, Invoker, ListenerStore
Defined in:
lib/philiprehberger/event_emitter/emitter.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(history_size: 0) ⇒ Emitter

Returns a new instance of Emitter.



13
14
15
16
17
18
19
20
21
# File 'lib/philiprehberger/event_emitter/emitter.rb', line 13

def initialize(history_size: 0)
  @listeners = {}
  @wildcard_listeners = []
  @mutex = Mutex.new
  @on_error = nil
  @max_listeners = 10
  @history_size = history_size
  @history = []
end

Instance Attribute Details

#max_listenersObject

Returns the value of attribute max_listeners.



10
11
12
# File 'lib/philiprehberger/event_emitter/emitter.rb', line 10

def max_listeners
  @max_listeners
end

#on_error=(value) ⇒ Object (writeonly)

Sets the attribute on_error

Parameters:

  • value

    the value to set the attribute on_error to.



11
12
13
# File 'lib/philiprehberger/event_emitter/emitter.rb', line 11

def on_error=(value)
  @on_error = value
end

Instance Method Details

#emit(event, *args, **kwargs) ⇒ Object



41
42
43
44
45
46
47
48
49
50
# File 'lib/philiprehberger/event_emitter/emitter.rb', line 41

def emit(event, *args, **kwargs)
  timestamp = Time.now
  record_history(event, args, kwargs, timestamp)
  all_entries = collect_entries(event)
  return false if all_entries.empty?

  meta = EventMetadata.new(event_name: event, timestamp: timestamp)
  invoke_entries(all_entries, args, kwargs, event, meta)
  true
end

#emit_async(event, *args, **kwargs) ⇒ Object



52
53
54
55
56
57
58
59
60
# File 'lib/philiprehberger/event_emitter/emitter.rb', line 52

def emit_async(event, *args, **kwargs)
  timestamp = Time.now
  record_history(event, args, kwargs, timestamp)
  all_entries = collect_entries(event)
  return [] if all_entries.empty?

  meta = EventMetadata.new(event_name: event, timestamp: timestamp)
  spawn_listener_threads(all_entries, args, kwargs, event, meta)
end

#event_namesObject



210
211
212
# File 'lib/philiprehberger/event_emitter/emitter.rb', line 210

def event_names
  @mutex.synchronize { @listeners.keys }
end

#event_stats(event) ⇒ Hash

Summary of registered listeners for an event, including wildcard matches. Useful for diagnostics: counts exact vs. wildcard listeners, one-shot listeners, and compares against the configured ceiling.

Parameters:

  • event (String, Symbol)

    the event name

Returns:

  • (Hash)

    :listeners, :once_listeners, :wildcards, :max_listeners



192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/philiprehberger/event_emitter/emitter.rb', line 192

def event_stats(event)
  @mutex.synchronize do
    exact = @listeners[event] || []
    wildcards = @wildcard_listeners.select { |e| Pattern.match?(e[:pattern], event.to_s) }
    {
      listeners: exact.size + wildcards.size,
      once_listeners: exact.count { |e| e[:once] } + wildcards.count { |e| e[:once] },
      wildcards: wildcards.size,
      max_listeners: @max_listeners
    }
  end
end

#listener_count(event) ⇒ Object



182
183
184
# File 'lib/philiprehberger/event_emitter/emitter.rb', line 182

def listener_count(event)
  @mutex.synchronize { (@listeners[event] || []).size }
end

#listeners(event) ⇒ Object



176
177
178
179
180
# File 'lib/philiprehberger/event_emitter/emitter.rb', line 176

def listeners(event)
  @mutex.synchronize do
    (@listeners[event] || []).map { |entry| entry[:block] }
  end
end

#off(event, &block) ⇒ Object



165
166
167
168
169
170
171
172
173
174
# File 'lib/philiprehberger/event_emitter/emitter.rb', line 165

def off(event, &block)
  @mutex.synchronize do
    if Pattern.wildcard?(event.to_s)
      remove_wildcard_listener(event, block)
    else
      remove_exact_listener(event, block)
    end
  end
  self
end

#on(event, priority: 0, replay: false, metadata: false, &block) ⇒ Object

Raises:

  • (ArgumentError)


23
24
25
26
27
28
29
30
# File 'lib/philiprehberger/event_emitter/emitter.rb', line 23

def on(event, priority: 0, replay: false, metadata: false, &block)
  raise ArgumentError, 'block required' unless block

  entry = { block: block, once: false, priority: priority, metadata:  }
  register_listener(event, entry)
  replay_history(event, entry) if replay
  self
end

#once(event, priority: 0, replay: false, metadata: false, &block) ⇒ Object

Raises:

  • (ArgumentError)


32
33
34
35
36
37
38
39
# File 'lib/philiprehberger/event_emitter/emitter.rb', line 32

def once(event, priority: 0, replay: false, metadata: false, &block)
  raise ArgumentError, 'block required' unless block

  entry = { block: block, once: true, priority: priority, metadata:  }
  register_listener(event, entry, check_max: false)
  replay_history(event, entry) if replay
  self
end

#remove_all_listeners(event = nil) ⇒ Object



205
206
207
208
# File 'lib/philiprehberger/event_emitter/emitter.rb', line 205

def remove_all_listeners(event = nil)
  @mutex.synchronize { clear_listeners(event) }
  self
end

#wait(event, timeout: nil) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/philiprehberger/event_emitter/emitter.rb', line 62

def wait(event, timeout: nil)
  mutex = Mutex.new
  cond = ConditionVariable.new
  payload = nil
  fired = false
  handler = lambda do |*args, **_kwargs|
    mutex.synchronize do
      next if fired

      payload = args
      fired = true
      cond.signal
    end
  end
  once(event, &handler)
  mutex.synchronize { cond.wait(mutex, timeout) unless fired }
  return payload if fired

  off(event, &handler)
  nil
end

#wait_all(*events, timeout: nil) ⇒ Hash{Object=>Array}?

Block the calling thread until all of the given events have fired.

Returns a Hash of ‘=> args_array` once every event has fired at least once, or `nil` on timeout. Each event records its first-fire arguments only. Temporary listeners registered on behalf of the wait are removed before returning.

Parameters:

  • events (Array<Symbol,String>)

    events to wait for (must be non-empty)

  • timeout (Numeric, nil) (defaults to: nil)

    maximum seconds to wait; nil to wait indefinitely

Returns:

  • (Hash{Object=>Array}, nil)

    map of event name to first-fire args, or nil on timeout

Raises:

  • (ArgumentError)


130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/philiprehberger/event_emitter/emitter.rb', line 130

def wait_all(*events, timeout: nil)
  raise ArgumentError, 'at least one event is required' if events.empty?

  mutex = Mutex.new
  cond = ConditionVariable.new
  results = {}
  handlers = {}
  deadline = timeout ? Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout : nil

  events.each do |event|
    handler = lambda do |*args, **_kwargs|
      mutex.synchronize do
        next if results.key?(event)

        results[event] = args
        cond.signal if results.size == events.size
      end
    end
    handlers[event] = handler
    once(event, &handler)
  end

  mutex.synchronize do
    until results.size == events.size
      remaining = deadline ? deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC) : nil
      break if remaining && remaining <= 0

      cond.wait(mutex, remaining)
    end
  end

  events.each { |event| off(event, &handlers[event]) unless results.key?(event) }
  results.size == events.size ? results : nil
end

#wait_any(*events, timeout: nil) ⇒ Array?

Block the calling thread until any of the given events fires.

Returns ‘[event_name, *args]` for the first event to fire, or `nil` on timeout. Temporary listeners registered on behalf of the wait are removed before returning.

Parameters:

  • events (Array<Symbol,String>)

    the events to wait for

  • timeout (Numeric, nil) (defaults to: nil)

    maximum seconds to wait; nil to wait indefinitely

Returns:

  • (Array, nil)

    ‘[event, *args]` on first fire, `nil` on timeout



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/philiprehberger/event_emitter/emitter.rb', line 93

def wait_any(*events, timeout: nil)
  mutex = Mutex.new
  cond = ConditionVariable.new
  fired_event = nil
  fired_args = nil
  handlers = {}

  events.each do |event|
    handler = lambda do |*args, **_kwargs|
      mutex.synchronize do
        next if fired_event

        fired_event = event
        fired_args = args
        cond.signal
      end
    end
    handlers[event] = handler
    once(event, &handler)
  end

  mutex.synchronize { cond.wait(mutex, timeout) unless fired_event }

  handlers.each { |event, handler| off(event, &handler) unless event == fired_event }
  fired_event ? [fired_event, *fired_args] : nil
end