Class: A2A::Store::PubSub
- Inherits:
-
Object
- Object
- A2A::Store::PubSub
- Defined in:
- lib/a2a/store/pub_sub.rb
Overview
Fiber-safe pub/sub for task update streaming.
Following the gospel (async-job / protocol-http):
- Async::Queue is fiber-safe (no locks needed)
- enqueue/dequeue yield the fiber cooperatively
- nil sentinel signals end of stream
Each subscriber gets an Async::Queue. State mutations push events to all subscribers for the affected task. Terminal states close all subscribers for that task.
Usage:
pub_sub = A2A::Store::PubSub.new
# Subscribe (returns an Async::Queue)
queue = pub_sub.subscribe("task-123")
# In another fiber, consume events:
Async do
while event = queue.dequeue
process(event)
end
end
# Publish an event to all subscribers:
pub_sub.notify("task-123", { type: :status, data: { ... } })
# Close all subscribers for a task (terminal state):
pub_sub.close("task-123")
Instance Method Summary collapse
-
#close(task_id) ⇒ Object
Close all subscribers for a task.
-
#initialize ⇒ PubSub
constructor
A new instance of PubSub.
-
#notify(task_id, event) ⇒ Object
Push an event to all subscribers for a task.
-
#subscribe(task_id) ⇒ Object
Subscribe to updates for a task.
-
#subscriber_count(task_id) ⇒ Object
Number of active subscribers for a task.
-
#task_count ⇒ Object
Total number of tasks with active subscribers.
-
#unsubscribe(task_id, queue) ⇒ Object
Remove a specific subscriber queue.
Constructor Details
#initialize ⇒ PubSub
Returns a new instance of PubSub.
40 41 42 |
# File 'lib/a2a/store/pub_sub.rb', line 40 def initialize @subscribers = Hash.new { |h, k| h[k] = [] } end |
Instance Method Details
#close(task_id) ⇒ Object
Close all subscribers for a task. Sends nil sentinel and removes all subscriptions.
72 73 74 75 76 77 |
# File 'lib/a2a/store/pub_sub.rb', line 72 def close(task_id) @subscribers[task_id].each do |queue| queue.enqueue(nil) # sentinel: end of stream end @subscribers.delete(task_id) end |
#notify(task_id, event) ⇒ Object
Push an event to all subscribers for a task.
64 65 66 67 68 |
# File 'lib/a2a/store/pub_sub.rb', line 64 def notify(task_id, event) @subscribers[task_id].each do |queue| queue.enqueue(event) end end |
#subscribe(task_id) ⇒ Object
Subscribe to updates for a task. Returns an Async::Queue that will receive events. A nil sentinel signals end of stream.
47 48 49 50 51 |
# File 'lib/a2a/store/pub_sub.rb', line 47 def subscribe(task_id) queue = Async::Queue.new @subscribers[task_id] << queue queue end |
#subscriber_count(task_id) ⇒ Object
Number of active subscribers for a task.
80 81 82 |
# File 'lib/a2a/store/pub_sub.rb', line 80 def subscriber_count(task_id) @subscribers[task_id].size end |
#task_count ⇒ Object
Total number of tasks with active subscribers.
85 86 87 |
# File 'lib/a2a/store/pub_sub.rb', line 85 def task_count @subscribers.size end |
#unsubscribe(task_id, queue) ⇒ Object
Remove a specific subscriber queue.
54 55 56 57 |
# File 'lib/a2a/store/pub_sub.rb', line 54 def unsubscribe(task_id, queue) @subscribers[task_id].delete(queue) @subscribers.delete(task_id) if @subscribers[task_id].empty? end |