Class: A2A::Store::PubSub

Inherits:
Object
  • Object
show all
Defined in:
lib/a2a/store/pub_sub.rb

Overview

Fiber-safe pub/sub for task update streaming.

Following the gospel (async-job / protocol-http):

- Async::Queue is fiber-safe (no locks needed)
- enqueue/dequeue yield the fiber cooperatively
- nil sentinel signals end of stream

Each subscriber gets an Async::Queue. State mutations push events to all subscribers for the affected task. Terminal states close all subscribers for that task.

Usage:

pub_sub = A2A::Store::PubSub.new

# Subscribe (returns an Async::Queue)
queue = pub_sub.subscribe("task-123")

# In another fiber, consume events:
Async do
  while event = queue.dequeue
    process(event)
  end
end

# Publish an event to all subscribers:
pub_sub.notify("task-123", { type: :status, data: { ... } })

# Close all subscribers for a task (terminal state):
pub_sub.close("task-123")

Instance Method Summary collapse

Constructor Details

#initializePubSub

Returns a new instance of PubSub.



40
41
42
# File 'lib/a2a/store/pub_sub.rb', line 40

def initialize
  @subscribers = Hash.new { |h, k| h[k] = [] }
end

Instance Method Details

#close(task_id) ⇒ Object

Close all subscribers for a task. Sends nil sentinel and removes all subscriptions.



72
73
74
75
76
77
# File 'lib/a2a/store/pub_sub.rb', line 72

def close(task_id)
  @subscribers[task_id].each do |queue|
    queue.enqueue(nil) # sentinel: end of stream
  end
  @subscribers.delete(task_id)
end

#notify(task_id, event) ⇒ Object

Push an event to all subscribers for a task.

Parameters:

  • task_id (String)
  • event (Hash)

    e.g. { type: :status, data: { … } }



64
65
66
67
68
# File 'lib/a2a/store/pub_sub.rb', line 64

def notify(task_id, event)
  @subscribers[task_id].each do |queue|
    queue.enqueue(event)
  end
end

#subscribe(task_id) ⇒ Object

Subscribe to updates for a task. Returns an Async::Queue that will receive events. A nil sentinel signals end of stream.



47
48
49
50
51
# File 'lib/a2a/store/pub_sub.rb', line 47

def subscribe(task_id)
  queue = Async::Queue.new
  @subscribers[task_id] << queue
  queue
end

#subscriber_count(task_id) ⇒ Object

Number of active subscribers for a task.



80
81
82
# File 'lib/a2a/store/pub_sub.rb', line 80

def subscriber_count(task_id)
  @subscribers[task_id].size
end

#task_countObject

Total number of tasks with active subscribers.



85
86
87
# File 'lib/a2a/store/pub_sub.rb', line 85

def task_count
  @subscribers.size
end

#unsubscribe(task_id, queue) ⇒ Object

Remove a specific subscriber queue.



54
55
56
57
# File 'lib/a2a/store/pub_sub.rb', line 54

def unsubscribe(task_id, queue)
  @subscribers[task_id].delete(queue)
  @subscribers.delete(task_id) if @subscribers[task_id].empty?
end