Class: A2A::Server::TaskBroadcast

Inherits:
Object
  • Object
show all
Defined in:
lib/simple_a2a/server/task_broadcast.rb

Defined Under Namespace

Classes: BroadcastError

Constant Summary collapse

DONE =
:done.freeze

Instance Method Summary collapse

Constructor Details

#initializeTaskBroadcast

Returns a new instance of TaskBroadcast.



12
13
14
15
# File 'lib/simple_a2a/server/task_broadcast.rb', line 12

def initialize
  @queues = []
  @mutex  = Mutex.new
end

Instance Method Details

#closeObject



40
41
42
43
# File 'lib/simple_a2a/server/task_broadcast.rb', line 40

def close
  snapshot = @mutex.synchronize { @queues.dup }
  snapshot.each { |q| q.async_push(DONE) }
end

#error(message) ⇒ Object



34
35
36
37
38
# File 'lib/simple_a2a/server/task_broadcast.rb', line 34

def error(message)
  ev = BroadcastError.new(message)
  snapshot = @mutex.synchronize { @queues.dup }
  snapshot.each { |q| q.async_push(ev) }
end

#publish(_task_id, event) ⇒ Object

Duck-type compatible with the old EventRouter interface. task_id is accepted but ignored — the broadcast is already task-scoped.



29
30
31
32
# File 'lib/simple_a2a/server/task_broadcast.rb', line 29

def publish(_task_id, event)
  snapshot = @mutex.synchronize { @queues.dup }
  snapshot.each { |q| q.async_push(event) }
end

#subscribe(capacity: 64) ⇒ Object



17
18
19
20
21
# File 'lib/simple_a2a/server/task_broadcast.rb', line 17

def subscribe(capacity: 64)
  RactorQueue.new(capacity: capacity).tap do |q|
    @mutex.synchronize { @queues << q }
  end
end

#unsubscribe(queue) ⇒ Object



23
24
25
# File 'lib/simple_a2a/server/task_broadcast.rb', line 23

def unsubscribe(queue)
  @mutex.synchronize { @queues.delete(queue) }
end