Module: Legate::SessionService::EventBroadcast
- Included in:
- ActiveRecord, InMemory
- Defined in:
- lib/legate/session_service/event_broadcast.rb
Overview
Session-scoped pub/sub for streaming agent events (R3).
A session service includes this mixin and calls #broadcast_event right after persisting an event in #append_event. Consumers (a Sinatra SSE response, a CLI, a test) #subscribe to a session_id and receive each event as it is appended, then #unsubscribe when done.
Delivery is synchronous and in-order on the appending thread; a subscriber that raises is isolated (logged, never breaks persistence or other subscribers). Subscribers are keyed by session_id, so concurrent runs on different sessions never cross.
Instance Method Summary collapse
-
#broadcast_event(session_id, event) ⇒ void
Notifies subscribers of ‘session_id` that `event` was appended.
-
#subscribe(session_id) {|event| ... } ⇒ Object
An opaque handle to pass to #unsubscribe.
-
#unsubscribe(handle) ⇒ Object
Removes a subscription created by #subscribe.
Instance Method Details
#broadcast_event(session_id, event) ⇒ void
This method returns an undefined value.
Notifies subscribers of ‘session_id` that `event` was appended.
53 54 55 56 57 58 59 60 61 62 |
# File 'lib/legate/session_service/event_broadcast.rb', line 53 def broadcast_event(session_id, event) subscribers = event_subscribers[session_id.to_s] return if subscribers.nil? || subscribers.empty? subscribers.each do |listener| listener.call(event) rescue StandardError => e Legate.logger.error("EventBroadcast: subscriber raised #{e.class}: #{e.}") end end |
#subscribe(session_id) {|event| ... } ⇒ Object
Returns an opaque handle to pass to #unsubscribe.
29 30 31 32 33 34 35 36 |
# File 'lib/legate/session_service/event_broadcast.rb', line 29 def subscribe(session_id, &listener) raise ArgumentError, 'subscribe requires a block' unless listener key = session_id.to_s subscribers = event_subscribers.compute_if_absent(key) { Concurrent::Array.new } subscribers << listener [key, listener] end |
#unsubscribe(handle) ⇒ Object
Removes a subscription created by #subscribe. Safe to call twice / with nil.
40 41 42 43 44 45 46 47 48 49 |
# File 'lib/legate/session_service/event_broadcast.rb', line 40 def unsubscribe(handle) return unless handle.is_a?(Array) key, listener = handle subscribers = event_subscribers[key] return unless subscribers subscribers.delete(listener) event_subscribers.delete(key) if subscribers.empty? end |