Class: SharedBroker::Adapters::InMemory
- Defined in:
- lib/shared_broker/adapters/in_memory.rb
Instance Method Summary collapse
-
#initialize ⇒ InMemory
constructor
A new instance of InMemory.
- #publish(topic, message, correlation_id: nil) ⇒ Object
- #published_messages(topic) ⇒ Object
- #subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object
Constructor Details
#initialize ⇒ InMemory
Returns a new instance of InMemory.
9 10 11 12 |
# File 'lib/shared_broker/adapters/in_memory.rb', line 9 def initialize @storage = Hash.new { |h, k| h[k] = [] } @subscribers = Hash.new { |h, k| h[k] = [] } end |
Instance Method Details
#publish(topic, message, correlation_id: nil) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/shared_broker/adapters/in_memory.rb', line 14 def publish(topic, , correlation_id: nil) = .merge(_correlation_id: correlation_id) @storage[topic] << @subscribers[topic].each do |sub| attempts = 0 begin sub[:block].call() rescue => e attempts += 1 if attempts <= sub[:max_retries] # Sleep briefly or not at all in memory to keep tests fast sleep(0.001 * sub[:backoff_base]**attempts) retry else dlq_topic = "#{sub[:queue_name]}.dlq" dlq_msg = .merge( _x_original_routing_key: topic, _x_failed_at: Time.now.utc.iso8601, _x_exception_class: e.class.name, _x_exception_message: e. ) @storage[dlq_topic] << dlq_msg end end end end |
#published_messages(topic) ⇒ Object
46 47 48 |
# File 'lib/shared_broker/adapters/in_memory.rb', line 46 def (topic) @storage[topic] end |
#subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object
42 43 44 |
# File 'lib/shared_broker/adapters/in_memory.rb', line 42 def subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) @subscribers[topic] << { queue_name: queue_name, max_retries: max_retries, backoff_base: backoff_base, block: block } end |