Class: SharedBroker::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/shared_broker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(adapter:, circuit_breaker: nil, middlewares: nil) ⇒ Client

Returns a new instance of Client.



27
28
29
30
31
32
33
34
35
36
37
# File 'lib/shared_broker.rb', line 27

def initialize(adapter:, circuit_breaker: nil, middlewares: nil)
  unless adapter.respond_to?(:publish) && adapter.respond_to?(:subscribe)
    raise ArgumentError, "Expected adapter to respond to :publish and :subscribe, got #{adapter.class} with value #{adapter.inspect}"
  end

  @adapter = adapter
  @circuit_breaker = circuit_breaker || CircuitBreaker.new
  
  resolved_middlewares = middlewares || [SharedBroker::Middlewares::OpenTelemetryPropagation.new]
  @middleware_pipeline = MiddlewarePipeline.new(resolved_middlewares)
end

Instance Attribute Details

#circuit_breakerObject (readonly)

Returns the value of attribute circuit_breaker.



25
26
27
# File 'lib/shared_broker.rb', line 25

def circuit_breaker
  @circuit_breaker
end

#middleware_pipelineObject (readonly)

Returns the value of attribute middleware_pipeline.



25
26
27
# File 'lib/shared_broker.rb', line 25

def middleware_pipeline
  @middleware_pipeline
end

Instance Method Details

#publish(topic, message, correlation_id: nil) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
# File 'lib/shared_broker.rb', line 39

def publish(topic, message, correlation_id: nil)
   = { correlation_id: correlation_id, operation: :publish }
  @middleware_pipeline.execute(topic, message, ) do
    SharedBroker::Validation.validate!(topic, message)
    encrypted_msg = SharedBroker::Cipher.encrypt(message, SharedBroker.encryption_key)

    @circuit_breaker.run do
      @adapter.publish(topic, encrypted_msg, correlation_id: correlation_id)
    end
  end
end

#subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
# File 'lib/shared_broker.rb', line 51

def subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block)
  @adapter.subscribe(topic, queue_name, max_retries: max_retries, backoff_base: backoff_base) do |raw_message|
    decrypted_msg = SharedBroker::Cipher.decrypt(raw_message, SharedBroker.encryption_key)
    SharedBroker::Validation.validate!(topic, decrypted_msg)

     = { correlation_id: decrypted_msg[:_correlation_id], operation: :subscribe, queue_name: queue_name }
    @middleware_pipeline.execute(topic, decrypted_msg, ) do
      block.call(decrypted_msg)
    end
  end
end