Class: A2A::Server::EventRouter

Inherits:
Object
  • Object
show all
Defined in:
lib/simple_a2a/server/event_router.rb

Instance Method Summary collapse

Constructor Details

#initializeEventRouter

Returns a new instance of EventRouter.



8
9
10
# File 'lib/simple_a2a/server/event_router.rb', line 8

def initialize
  @bus = TypedBus::MessageBus.new
end

Instance Method Details

#channel?(task_id) ⇒ Boolean

Returns:

  • (Boolean)


45
46
47
# File 'lib/simple_a2a/server/event_router.rb', line 45

def channel?(task_id)
  @bus.channel?(task_id.to_sym)
end

#close(task_id) ⇒ Object



17
18
19
# File 'lib/simple_a2a/server/event_router.rb', line 17

def close(task_id)
  @bus.remove_channel(task_id.to_sym)
end

#open(task_id) ⇒ Object



12
13
14
15
# File 'lib/simple_a2a/server/event_router.rb', line 12

def open(task_id)
  return if @bus.channel?(task_id.to_sym)
  @bus.add_channel(task_id.to_sym, type: nil, timeout: nil)
end

#publish(task_id, event) ⇒ Object



21
22
23
24
25
26
27
# File 'lib/simple_a2a/server/event_router.rb', line 21

def publish(task_id, event)
  sym = task_id.to_sym
  open(task_id) unless @bus.channel?(sym)
  @bus.publish(sym, event)
rescue ArgumentError
  nil
end

#subscribe(task_id, &block) ⇒ Object



29
30
31
32
33
34
35
36
# File 'lib/simple_a2a/server/event_router.rb', line 29

def subscribe(task_id, &block)
  sym = task_id.to_sym
  open(task_id) unless @bus.channel?(sym)
  @bus.subscribe(sym) do |delivery|
    block.call(delivery.message)
    delivery.ack!
  end
end

#unsubscribe(task_id, id_or_block) ⇒ Object



38
39
40
41
42
43
# File 'lib/simple_a2a/server/event_router.rb', line 38

def unsubscribe(task_id, id_or_block)
  return unless @bus.channel?(task_id.to_sym)
  @bus.unsubscribe(task_id.to_sym, id_or_block)
rescue ArgumentError
  nil
end