Module: Legion::Transport::Local
- Defined in:
- lib/legion/transport/local.rb
Class Method Summary collapse
- .active? ⇒ Boolean
- .publish(_exchange_name, routing_key, payload) ⇒ Object
- .queue_depth(queue_name) ⇒ Object
- .reset! ⇒ Object
- .setup ⇒ Object
- .subscribe(queue_name, &block) ⇒ Object
Class Method Details
.active? ⇒ Boolean
52 53 54 |
# File 'lib/legion/transport/local.rb', line 52 def active? true end |
.publish(_exchange_name, routing_key, payload) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/legion/transport/local.rb', line 15 def publish(_exchange_name, routing_key, payload, **) @mutex.synchronize do @queues[routing_key] ||= [] @queues[routing_key] << payload (@subscribers[routing_key] || []).each do |callback| callback.call(payload) end end Legion::Logging.debug "Local published routing_key=#{routing_key}" if defined?(Legion::Logging) { published: true, routing_key: routing_key } end |
.queue_depth(queue_name) ⇒ Object
40 41 42 |
# File 'lib/legion/transport/local.rb', line 40 def queue_depth(queue_name) @mutex.synchronize { (@queues[queue_name] || []).size } end |
.reset! ⇒ Object
44 45 46 47 48 49 50 |
# File 'lib/legion/transport/local.rb', line 44 def reset! Legion::Logging.info 'Legion::Transport::Local shut down (queues cleared)' if defined?(Legion::Logging) @mutex.synchronize do @queues.clear @subscribers.clear end end |
.setup ⇒ Object
11 12 13 |
# File 'lib/legion/transport/local.rb', line 11 def setup Legion::Logging.info 'Legion::Transport::Local initialized (in-memory mode)' if defined?(Legion::Logging) end |
.subscribe(queue_name, &block) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/legion/transport/local.rb', line 28 def subscribe(queue_name, &block) @mutex.synchronize do @subscribers[queue_name] ||= [] @subscribers[queue_name] << block (@queues[queue_name] || []).each { |msg| block.call(msg) } @queues[queue_name] = [] end Legion::Logging.debug "Local subscribed queue=#{queue_name}" if defined?(Legion::Logging) { subscribed: true, queue: queue_name } end |