Class: Profiler::SSE::RedisEventBus
- Inherits:
-
Object
- Object
- Profiler::SSE::RedisEventBus
- Includes:
- Singleton
- Defined in:
- lib/profiler/sse/redis_event_bus.rb
Constant Summary collapse
- CHANNEL_PREFIX =
"profiler:events"
Instance Method Summary collapse
- #broadcast(token, collectors) ⇒ Object
-
#initialize ⇒ RedisEventBus
constructor
A new instance of RedisEventBus.
- #subscribe(token, collectors) ⇒ Object
- #unsubscribe(id) ⇒ Object
- #wait_for_event(id, timeout: 30) ⇒ Object
Constructor Details
#initialize ⇒ RedisEventBus
Returns a new instance of RedisEventBus.
17 18 19 20 21 |
# File 'lib/profiler/sse/redis_event_bus.rb', line 17 def initialize @subscriptions = Concurrent::Hash.new @listener_thread = nil @listener_mutex = Mutex.new end |
Instance Method Details
#broadcast(token, collectors) ⇒ Object
34 35 36 37 |
# File 'lib/profiler/sse/redis_event_bus.rb', line 34 def broadcast(token, collectors) payload = { token: token, collectors: collectors.map(&:to_s), timestamp: Time.now.to_f }.to_json publish_redis_client.publish("#{CHANNEL_PREFIX}:#{token}", payload) end |
#subscribe(token, collectors) ⇒ Object
23 24 25 26 27 28 |
# File 'lib/profiler/sse/redis_event_bus.rb', line 23 def subscribe(token, collectors) id = SecureRandom.uuid @subscriptions[id] = { token: token, collectors: Set.new(collectors.map(&:to_s)), queue: Queue.new } ensure_listener_running id end |
#unsubscribe(id) ⇒ Object
30 31 32 |
# File 'lib/profiler/sse/redis_event_bus.rb', line 30 def unsubscribe(id) @subscriptions.delete(id) end |
#wait_for_event(id, timeout: 30) ⇒ Object
39 40 41 42 43 44 45 |
# File 'lib/profiler/sse/redis_event_bus.rb', line 39 def wait_for_event(id, timeout: 30) sub = @subscriptions[id] return nil unless sub Timeout.timeout(timeout) { sub[:queue].pop } rescue Timeout::Error nil end |