Class: SharedBroker::Adapters::InMemory

Inherits:
Base
  • Object
show all
Defined in:
lib/shared_broker/adapters/in_memory.rb

Instance Method Summary collapse

Methods inherited from Base

#publish_batch

Constructor Details

#initializeInMemory

Returns a new instance of InMemory.



9
10
11
12
# File 'lib/shared_broker/adapters/in_memory.rb', line 9

def initialize
  @storage = Hash.new { |h, k| h[k] = [] }
  @subscribers = Hash.new { |h, k| h[k] = [] }
end

Instance Method Details

#publish(topic, message, correlation_id: nil) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/shared_broker/adapters/in_memory.rb', line 14

def publish(topic, message, correlation_id: nil)
   = message.merge(_correlation_id: correlation_id)
  @storage[topic] << 
  
  @subscribers[topic].each do |sub|
    attempts = 0
    begin
      sub[:block].call()
    rescue SharedBroker::ShutdownError => e
      raise e
    rescue => e
      attempts += 1
      if attempts <= sub[:max_retries]
        # Sleep briefly or not at all in memory to keep tests fast
        sleep(0.001 * sub[:backoff_base]**attempts)
        retry
      else
        dlq_topic = "#{sub[:queue_name]}.dlq"
        dlq_msg = .merge(
          _x_original_routing_key: topic,
          _x_failed_at: Time.now.utc.iso8601,
          _x_exception_class: e.class.name,
          _x_exception_message: e.message
        )
        @storage[dlq_topic] << dlq_msg
      end
    end
  end
end

#published_messages(topic) ⇒ Object



48
49
50
# File 'lib/shared_broker/adapters/in_memory.rb', line 48

def published_messages(topic)
  @storage[topic]
end

#redrive_dlq(dlq_name, original_topic, limit: nil) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/shared_broker/adapters/in_memory.rb', line 52

def redrive_dlq(dlq_name, original_topic, limit: nil)
  dlq_messages = @storage[dlq_name]
  return if dlq_messages.empty?

  to_redrive = limit ? dlq_messages.first(limit) : dlq_messages.dup
  to_redrive.each do |msg|
    cleaned_msg = msg.dup
    cleaned_msg.delete(:_x_original_routing_key)
    cleaned_msg.delete(:_x_failed_at)
    cleaned_msg.delete(:_x_exception_class)
    cleaned_msg.delete(:_x_exception_message)

    publish(original_topic, cleaned_msg, correlation_id: cleaned_msg[:_correlation_id])
    @storage[dlq_name].delete(msg)
  end
end

#subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object



44
45
46
# File 'lib/shared_broker/adapters/in_memory.rb', line 44

def subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block)
  @subscribers[topic] << { queue_name: queue_name, max_retries: max_retries, backoff_base: backoff_base, block: block }
end