Module: Legion::Transport::Local

Extended by:
Logging::Helper
Defined in:
lib/legion/transport/local.rb

Class Method Summary collapse

Class Method Details

.active?Boolean

Returns:

  • (Boolean)


56
57
58
# File 'lib/legion/transport/local.rb', line 56

def active?
  true
end

.publish(_exchange_name, routing_key, payload) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/legion/transport/local.rb', line 19

def publish(_exchange_name, routing_key, payload, **)
  @mutex.synchronize do
    @queues[routing_key] ||= []
    @queues[routing_key] << payload

    (@subscribers[routing_key] || []).each do |callback|
      callback.call(payload)
    end
  end
  log.debug "Local published routing_key=#{routing_key}"
  { published: true, routing_key: routing_key }
end

.queue_depth(queue_name) ⇒ Object



44
45
46
# File 'lib/legion/transport/local.rb', line 44

def queue_depth(queue_name)
  @mutex.synchronize { (@queues[queue_name] || []).size }
end

.reset!Object



48
49
50
51
52
53
54
# File 'lib/legion/transport/local.rb', line 48

def reset!
  log.info 'Legion::Transport::Local shut down (queues cleared)'
  @mutex.synchronize do
    @queues.clear
    @subscribers.clear
  end
end

.setupObject



15
16
17
# File 'lib/legion/transport/local.rb', line 15

def setup
  log.info 'Legion::Transport::Local initialized (in-memory mode)'
end

.subscribe(queue_name, &block) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/legion/transport/local.rb', line 32

def subscribe(queue_name, &block)
  @mutex.synchronize do
    @subscribers[queue_name] ||= []
    @subscribers[queue_name] << block

    (@queues[queue_name] || []).each { |msg| block.call(msg) }
    @queues[queue_name] = []
  end
  log.info "Local subscribed queue=#{queue_name}"
  { subscribed: true, queue: queue_name }
end