Class: SharedBroker::Adapters::InMemory

Inherits:
Base
  • Object
show all
Defined in:
lib/shared_broker/adapters/in_memory.rb

Instance Method Summary collapse

Constructor Details

#initializeInMemory

Returns a new instance of InMemory.



9
10
11
12
# File 'lib/shared_broker/adapters/in_memory.rb', line 9

def initialize
  @storage = Hash.new { |h, k| h[k] = [] }
  @subscribers = Hash.new { |h, k| h[k] = [] }
end

Instance Method Details

#publish(topic, message, correlation_id: nil) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/shared_broker/adapters/in_memory.rb', line 14

def publish(topic, message, correlation_id: nil)
   = message.merge(_correlation_id: correlation_id)
  @storage[topic] << 
  
  @subscribers[topic].each do |sub|
    attempts = 0
    begin
      sub[:block].call()
    rescue => e
      attempts += 1
      if attempts <= sub[:max_retries]
        # Sleep briefly or not at all in memory to keep tests fast
        sleep(0.001 * sub[:backoff_base]**attempts)
        retry
      else
        dlq_topic = "#{sub[:queue_name]}.dlq"
        dlq_msg = .merge(
          _x_original_routing_key: topic,
          _x_failed_at: Time.now.utc.iso8601,
          _x_exception_class: e.class.name,
          _x_exception_message: e.message
        )
        @storage[dlq_topic] << dlq_msg
      end
    end
  end
end

#published_messages(topic) ⇒ Object



46
47
48
# File 'lib/shared_broker/adapters/in_memory.rb', line 46

def published_messages(topic)
  @storage[topic]
end

#subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object



42
43
44
# File 'lib/shared_broker/adapters/in_memory.rb', line 42

def subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block)
  @subscribers[topic] << { queue_name: queue_name, max_retries: max_retries, backoff_base: backoff_base, block: block }
end