Class: SharedBroker::Adapters::Redis
- Defined in:
- lib/shared_broker/adapters/redis.rb
Instance Method Summary collapse
-
#initialize(redis_url:) ⇒ Redis
constructor
A new instance of Redis.
- #publish(topic, message, correlation_id: nil) ⇒ Object
- #subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object
Constructor Details
#initialize(redis_url:) ⇒ Redis
Returns a new instance of Redis.
10 11 12 13 14 15 16 17 |
# File 'lib/shared_broker/adapters/redis.rb', line 10 def initialize(redis_url:) begin require "redis" rescue LoadError raise unless defined?(::Redis) end @redis = ::Redis.new(url: redis_url) end |
Instance Method Details
#publish(topic, message, correlation_id: nil) ⇒ Object
19 20 21 22 23 24 25 26 |
# File 'lib/shared_broker/adapters/redis.rb', line 19 def publish(topic, , correlation_id: nil) unless .is_a?(Hash) raise ArgumentError, "Expected message to be a Hash, got #{.class} with value #{.inspect}" end payload = .merge(_correlation_id: correlation_id) @redis.publish(topic, payload.to_json) end |
#subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/shared_broker/adapters/redis.rb', line 28 def subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) Thread.new do @redis.subscribe(topic) do |on| on. do |_channel, msg_json| data = JSON.parse(msg_json, symbolize_names: true) attempts = 0 begin block.call(data) rescue => e attempts += 1 if attempts <= max_retries sleep(backoff_base**attempts) retry else publish_to_dlq(topic, queue_name, msg_json, e) end end end end end end |