Module: Legion::Transport::Local

Defined in:
lib/legion/transport/local.rb

Class Method Summary collapse

Class Method Details

.active?Boolean

Returns:

  • (Boolean)


52
53
54
# File 'lib/legion/transport/local.rb', line 52

def active?
  true
end

.publish(_exchange_name, routing_key, payload) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/legion/transport/local.rb', line 15

def publish(_exchange_name, routing_key, payload, **)
  @mutex.synchronize do
    @queues[routing_key] ||= []
    @queues[routing_key] << payload

    (@subscribers[routing_key] || []).each do |callback|
      callback.call(payload)
    end
  end
  Legion::Logging.debug "Local published routing_key=#{routing_key}" if defined?(Legion::Logging)
  { published: true, routing_key: routing_key }
end

.queue_depth(queue_name) ⇒ Object



40
41
42
# File 'lib/legion/transport/local.rb', line 40

def queue_depth(queue_name)
  @mutex.synchronize { (@queues[queue_name] || []).size }
end

.reset!Object



44
45
46
47
48
49
50
# File 'lib/legion/transport/local.rb', line 44

def reset!
  Legion::Logging.info 'Legion::Transport::Local shut down (queues cleared)' if defined?(Legion::Logging)
  @mutex.synchronize do
    @queues.clear
    @subscribers.clear
  end
end

.setupObject



11
12
13
# File 'lib/legion/transport/local.rb', line 11

def setup
  Legion::Logging.info 'Legion::Transport::Local initialized (in-memory mode)' if defined?(Legion::Logging)
end

.subscribe(queue_name, &block) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/legion/transport/local.rb', line 28

def subscribe(queue_name, &block)
  @mutex.synchronize do
    @subscribers[queue_name] ||= []
    @subscribers[queue_name] << block

    (@queues[queue_name] || []).each { |msg| block.call(msg) }
    @queues[queue_name] = []
  end
  Legion::Logging.debug "Local subscribed queue=#{queue_name}" if defined?(Legion::Logging)
  { subscribed: true, queue: queue_name }
end