Class: SharedBroker::Adapters::InMemory
- Defined in:
- lib/shared_broker/adapters/in_memory.rb
Instance Method Summary collapse
-
#initialize ⇒ InMemory
constructor
A new instance of InMemory.
- #publish(topic, message, correlation_id: nil) ⇒ Object
- #published_messages(topic) ⇒ Object
- #redrive_dlq(dlq_name, original_topic, limit: nil) ⇒ Object
- #subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object
Methods inherited from Base
Constructor Details
#initialize ⇒ InMemory
Returns a new instance of InMemory.
9 10 11 12 |
# File 'lib/shared_broker/adapters/in_memory.rb', line 9 def initialize @storage = Hash.new { |h, k| h[k] = [] } @subscribers = Hash.new { |h, k| h[k] = [] } end |
Instance Method Details
#publish(topic, message, correlation_id: nil) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/shared_broker/adapters/in_memory.rb', line 14 def publish(topic, , correlation_id: nil) = .merge(_correlation_id: correlation_id) @storage[topic] << @subscribers[topic].each do |sub| attempts = 0 begin sub[:block].call() rescue SharedBroker::ShutdownError => e raise e rescue => e attempts += 1 if attempts <= sub[:max_retries] # Sleep briefly or not at all in memory to keep tests fast sleep(0.001 * sub[:backoff_base]**attempts) retry else dlq_topic = "#{sub[:queue_name]}.dlq" dlq_msg = .merge( _x_original_routing_key: topic, _x_failed_at: Time.now.utc.iso8601, _x_exception_class: e.class.name, _x_exception_message: e. ) @storage[dlq_topic] << dlq_msg end end end end |
#published_messages(topic) ⇒ Object
48 49 50 |
# File 'lib/shared_broker/adapters/in_memory.rb', line 48 def (topic) @storage[topic] end |
#redrive_dlq(dlq_name, original_topic, limit: nil) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/shared_broker/adapters/in_memory.rb', line 52 def redrive_dlq(dlq_name, original_topic, limit: nil) = @storage[dlq_name] return if .empty? to_redrive = limit ? .first(limit) : .dup to_redrive.each do |msg| cleaned_msg = msg.dup cleaned_msg.delete(:_x_original_routing_key) cleaned_msg.delete(:_x_failed_at) cleaned_msg.delete(:_x_exception_class) cleaned_msg.delete(:_x_exception_message) publish(original_topic, cleaned_msg, correlation_id: cleaned_msg[:_correlation_id]) @storage[dlq_name].delete(msg) end end |
#subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object
44 45 46 |
# File 'lib/shared_broker/adapters/in_memory.rb', line 44 def subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) @subscribers[topic] << { queue_name: queue_name, max_retries: max_retries, backoff_base: backoff_base, block: block } end |