Module: Legion::Transport::Local
- Defined in:
- lib/legion/transport/local.rb
Class Method Summary collapse
- .active? ⇒ Boolean
- .publish(_exchange_name, routing_key, payload) ⇒ Object
- .queue_depth(queue_name) ⇒ Object
- .reset! ⇒ Object
- .subscribe(queue_name, &block) ⇒ Object
Class Method Details
.active? ⇒ Boolean
45 46 47 |
# File 'lib/legion/transport/local.rb', line 45 def active? true end |
.publish(_exchange_name, routing_key, payload) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/legion/transport/local.rb', line 11 def publish(_exchange_name, routing_key, payload, **) @mutex.synchronize do @queues[routing_key] ||= [] @queues[routing_key] << payload (@subscribers[routing_key] || []).each do |callback| callback.call(payload) end end { published: true, routing_key: routing_key } end |
.queue_depth(queue_name) ⇒ Object
34 35 36 |
# File 'lib/legion/transport/local.rb', line 34 def queue_depth(queue_name) @mutex.synchronize { (@queues[queue_name] || []).size } end |
.reset! ⇒ Object
38 39 40 41 42 43 |
# File 'lib/legion/transport/local.rb', line 38 def reset! @mutex.synchronize do @queues.clear @subscribers.clear end end |
.subscribe(queue_name, &block) ⇒ Object
23 24 25 26 27 28 29 30 31 32 |
# File 'lib/legion/transport/local.rb', line 23 def subscribe(queue_name, &block) @mutex.synchronize do @subscribers[queue_name] ||= [] @subscribers[queue_name] << block (@queues[queue_name] || []).each { |msg| block.call(msg) } @queues[queue_name] = [] end { subscribed: true, queue: queue_name } end |