Class: Profiler::SSE::RedisEventBus

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/profiler/sse/redis_event_bus.rb

Constant Summary collapse

CHANNEL_PREFIX =
"profiler:events"

Instance Method Summary collapse

Constructor Details

#initializeRedisEventBus

Returns a new instance of RedisEventBus.



17
18
19
20
21
# File 'lib/profiler/sse/redis_event_bus.rb', line 17

def initialize
  @subscriptions = Concurrent::Hash.new
  @listener_thread = nil
  @listener_mutex = Mutex.new
end

Instance Method Details

#broadcast(token, collectors) ⇒ Object



34
35
36
37
# File 'lib/profiler/sse/redis_event_bus.rb', line 34

def broadcast(token, collectors)
  payload = { token: token, collectors: collectors.map(&:to_s), timestamp: Time.now.to_f }.to_json
  publish_redis_client.publish("#{CHANNEL_PREFIX}:#{token}", payload)
end

#subscribe(token, collectors) ⇒ Object



23
24
25
26
27
28
# File 'lib/profiler/sse/redis_event_bus.rb', line 23

def subscribe(token, collectors)
  id = SecureRandom.uuid
  @subscriptions[id] = { token: token, collectors: Set.new(collectors.map(&:to_s)), queue: Queue.new }
  ensure_listener_running
  id
end

#unsubscribe(id) ⇒ Object



30
31
32
# File 'lib/profiler/sse/redis_event_bus.rb', line 30

def unsubscribe(id)
  @subscriptions.delete(id)
end

#wait_for_event(id, timeout: 30) ⇒ Object



39
40
41
42
43
44
45
# File 'lib/profiler/sse/redis_event_bus.rb', line 39

def wait_for_event(id, timeout: 30)
  sub = @subscriptions[id]
  return nil unless sub
  Timeout.timeout(timeout) { sub[:queue].pop }
rescue Timeout::Error
  nil
end