Class: SharedBroker::Adapters::Redis

Inherits:
Base
  • Object
show all
Defined in:
lib/shared_broker/adapters/redis.rb

Instance Method Summary collapse

Constructor Details

#initialize(redis_url:) ⇒ Redis

Returns a new instance of Redis.



10
11
12
13
14
15
16
17
# File 'lib/shared_broker/adapters/redis.rb', line 10

def initialize(redis_url:)
  begin
    require "redis"
  rescue LoadError
    raise unless defined?(::Redis)
  end
  @redis = ::Redis.new(url: redis_url)
end

Instance Method Details

#publish(topic, message, correlation_id: nil) ⇒ Object



19
20
21
22
23
24
25
26
# File 'lib/shared_broker/adapters/redis.rb', line 19

def publish(topic, message, correlation_id: nil)
  unless message.is_a?(Hash)
    raise ArgumentError, "Expected message to be a Hash, got #{message.class} with value #{message.inspect}"
  end

  payload = message.merge(_correlation_id: correlation_id)
  @redis.publish(topic, payload.to_json)
end

#subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/shared_broker/adapters/redis.rb', line 28

def subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block)
  Thread.new do
    @redis.subscribe(topic) do |on|
      on.message do |_channel, msg_json|
        data = JSON.parse(msg_json, symbolize_names: true)
        attempts = 0
        begin
          block.call(data)
        rescue => e
          attempts += 1
          if attempts <= max_retries
            sleep(backoff_base**attempts)
            retry
          else
            publish_to_dlq(topic, queue_name, msg_json, e)
          end
        end
      end
    end
  end
end