Class: Profiler::SSE::EventBus
- Inherits:
-
Object
- Object
- Profiler::SSE::EventBus
- Includes:
- Singleton
- Defined in:
- lib/profiler/sse/event_bus.rb
Instance Method Summary collapse
- #broadcast(token, collectors) ⇒ Object
-
#initialize ⇒ EventBus
constructor
A new instance of EventBus.
- #subscribe(token, collectors) ⇒ Object
- #unsubscribe(id) ⇒ Object
- #wait_for_event(id, timeout: 30) ⇒ Object
Constructor Details
#initialize ⇒ EventBus
Returns a new instance of EventBus.
14 15 16 |
# File 'lib/profiler/sse/event_bus.rb', line 14 def initialize @subscriptions = Concurrent::Hash.new end |
Instance Method Details
#broadcast(token, collectors) ⇒ Object
28 29 30 31 32 33 34 35 36 |
# File 'lib/profiler/sse/event_bus.rb', line 28 def broadcast(token, collectors) changed = Set.new(collectors.map(&:to_s)) @subscriptions.each_value do |sub| next unless sub[:token] == token # Empty collector set means "match all"; non-empty set filters by intersection. next if sub[:collectors].any? && (sub[:collectors] & changed).empty? sub[:queue] << { token: token, collectors: changed.to_a, timestamp: Time.now.to_f } end end |
#subscribe(token, collectors) ⇒ Object
18 19 20 21 22 |
# File 'lib/profiler/sse/event_bus.rb', line 18 def subscribe(token, collectors) id = SecureRandom.uuid @subscriptions[id] = { token: token, collectors: Set.new(collectors.map(&:to_s)), queue: Queue.new } id end |
#unsubscribe(id) ⇒ Object
24 25 26 |
# File 'lib/profiler/sse/event_bus.rb', line 24 def unsubscribe(id) @subscriptions.delete(id) end |
#wait_for_event(id, timeout: 30) ⇒ Object
38 39 40 41 42 43 44 |
# File 'lib/profiler/sse/event_bus.rb', line 38 def wait_for_event(id, timeout: 30) sub = @subscriptions[id] return nil unless sub Timeout.timeout(timeout) { sub[:queue].pop } rescue Timeout::Error nil end |