Module: Legion::Transport::Local
- Extended by:
- Logging::Helper
- Defined in:
- lib/legion/transport/local.rb
Class Method Summary collapse
- .active? ⇒ Boolean
- .publish(_exchange_name, routing_key, payload) ⇒ Object
- .queue_depth(queue_name) ⇒ Object
- .reset! ⇒ Object
- .setup ⇒ Object
- .subscribe(queue_name, &block) ⇒ Object
Class Method Details
.active? ⇒ Boolean
56 57 58 |
# File 'lib/legion/transport/local.rb', line 56 def active? true end |
.publish(_exchange_name, routing_key, payload) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/legion/transport/local.rb', line 19 def publish(_exchange_name, routing_key, payload, **) @mutex.synchronize do @queues[routing_key] ||= [] @queues[routing_key] << payload (@subscribers[routing_key] || []).each do |callback| callback.call(payload) end end log.debug "Local published routing_key=#{routing_key}" { published: true, routing_key: routing_key } end |
.queue_depth(queue_name) ⇒ Object
44 45 46 |
# File 'lib/legion/transport/local.rb', line 44 def queue_depth(queue_name) @mutex.synchronize { (@queues[queue_name] || []).size } end |
.reset! ⇒ Object
48 49 50 51 52 53 54 |
# File 'lib/legion/transport/local.rb', line 48 def reset! log.info 'Legion::Transport::Local shut down (queues cleared)' @mutex.synchronize do @queues.clear @subscribers.clear end end |
.setup ⇒ Object
15 16 17 |
# File 'lib/legion/transport/local.rb', line 15 def setup log.info 'Legion::Transport::Local initialized (in-memory mode)' end |
.subscribe(queue_name, &block) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/legion/transport/local.rb', line 32 def subscribe(queue_name, &block) @mutex.synchronize do @subscribers[queue_name] ||= [] @subscribers[queue_name] << block (@queues[queue_name] || []).each { |msg| block.call(msg) } @queues[queue_name] = [] end log.info "Local subscribed queue=#{queue_name}" { subscribed: true, queue: queue_name } end |