Class: Profiler::SSE::EventBus

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/profiler/sse/event_bus.rb

Instance Method Summary collapse

Constructor Details

#initializeEventBus

Returns a new instance of EventBus.



14
15
16
# File 'lib/profiler/sse/event_bus.rb', line 14

def initialize
  @subscriptions = Concurrent::Hash.new
end

Instance Method Details

#broadcast(token, collectors) ⇒ Object



28
29
30
31
32
33
34
35
36
# File 'lib/profiler/sse/event_bus.rb', line 28

def broadcast(token, collectors)
  changed = Set.new(collectors.map(&:to_s))
  @subscriptions.each_value do |sub|
    next unless sub[:token] == token
    # Empty collector set means "match all"; non-empty set filters by intersection.
    next if sub[:collectors].any? && (sub[:collectors] & changed).empty?
    sub[:queue] << { token: token, collectors: changed.to_a, timestamp: Time.now.to_f }
  end
end

#subscribe(token, collectors) ⇒ Object



18
19
20
21
22
# File 'lib/profiler/sse/event_bus.rb', line 18

def subscribe(token, collectors)
  id = SecureRandom.uuid
  @subscriptions[id] = { token: token, collectors: Set.new(collectors.map(&:to_s)), queue: Queue.new }
  id
end

#unsubscribe(id) ⇒ Object



24
25
26
# File 'lib/profiler/sse/event_bus.rb', line 24

def unsubscribe(id)
  @subscriptions.delete(id)
end

#wait_for_event(id, timeout: 30) ⇒ Object



38
39
40
41
42
43
44
# File 'lib/profiler/sse/event_bus.rb', line 38

def wait_for_event(id, timeout: 30)
  sub = @subscriptions[id]
  return nil unless sub
  Timeout.timeout(timeout) { sub[:queue].pop }
rescue Timeout::Error
  nil
end