Module: Legate::SessionService::EventBroadcast

Included in:
ActiveRecord, InMemory
Defined in:
lib/legate/session_service/event_broadcast.rb

Overview

Session-scoped pub/sub for streaming agent events (R3).

A session service includes this mixin and calls #broadcast_event right after persisting an event in #append_event. Consumers (a Sinatra SSE response, a CLI, a test) #subscribe to a session_id and receive each event as it is appended, then #unsubscribe when done.

Delivery is synchronous and in-order on the appending thread; a subscriber that raises is isolated (logged, never breaks persistence or other subscribers). Subscribers are keyed by session_id, so concurrent runs on different sessions never cross.

Instance Method Summary collapse

Instance Method Details

#broadcast_event(session_id, event) ⇒ void

This method returns an undefined value.

Notifies subscribers of ‘session_id` that `event` was appended.



53
54
55
56
57
58
59
60
61
62
# File 'lib/legate/session_service/event_broadcast.rb', line 53

def broadcast_event(session_id, event)
  subscribers = event_subscribers[session_id.to_s]
  return if subscribers.nil? || subscribers.empty?

  subscribers.each do |listener|
    listener.call(event)
  rescue StandardError => e
    Legate.logger.error("EventBroadcast: subscriber raised #{e.class}: #{e.message}")
  end
end

#subscribe(session_id) {|event| ... } ⇒ Object

Returns an opaque handle to pass to #unsubscribe.

Parameters:

  • session_id (String)

Yield Parameters:

Returns:

Raises:

  • (ArgumentError)


29
30
31
32
33
34
35
36
# File 'lib/legate/session_service/event_broadcast.rb', line 29

def subscribe(session_id, &listener)
  raise ArgumentError, 'subscribe requires a block' unless listener

  key = session_id.to_s
  subscribers = event_subscribers.compute_if_absent(key) { Concurrent::Array.new }
  subscribers << listener
  [key, listener]
end

#unsubscribe(handle) ⇒ Object

Removes a subscription created by #subscribe. Safe to call twice / with nil.

Parameters:

  • handle (Object)

    the value returned by #subscribe



40
41
42
43
44
45
46
47
48
49
# File 'lib/legate/session_service/event_broadcast.rb', line 40

def unsubscribe(handle)
  return unless handle.is_a?(Array)

  key, listener = handle
  subscribers = event_subscribers[key]
  return unless subscribers

  subscribers.delete(listener)
  event_subscribers.delete(key) if subscribers.empty?
end