Class: SharedBroker::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/shared_broker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(adapter: nil, adapters: nil, routing: nil, circuit_breaker: nil, middlewares: nil) ⇒ Client

Returns a new instance of Client.



30
31
32
33
34
35
# File 'lib/shared_broker.rb', line 30

def initialize(adapter: nil, adapters: nil, routing: nil, circuit_breaker: nil, middlewares: nil)
  setup_adapters(adapter: adapter, adapters: adapters, routing: routing)
  @circuit_breaker = circuit_breaker || CircuitBreaker.new
  resolved_middlewares = middlewares || [SharedBroker::Middlewares::OpenTelemetryPropagation.new]
  @middleware_pipeline = MiddlewarePipeline.new(resolved_middlewares)
end

Instance Attribute Details

#adaptersObject (readonly)

Returns the value of attribute adapters.



28
29
30
# File 'lib/shared_broker.rb', line 28

def adapters
  @adapters
end

#circuit_breakerObject (readonly)

Returns the value of attribute circuit_breaker.



28
29
30
# File 'lib/shared_broker.rb', line 28

def circuit_breaker
  @circuit_breaker
end

#middleware_pipelineObject (readonly)

Returns the value of attribute middleware_pipeline.



28
29
30
# File 'lib/shared_broker.rb', line 28

def middleware_pipeline
  @middleware_pipeline
end

#routingObject (readonly)

Returns the value of attribute routing.



28
29
30
# File 'lib/shared_broker.rb', line 28

def routing
  @routing
end

Instance Method Details

#publish(topic, message, correlation_id: nil) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/shared_broker.rb', line 37

def publish(topic, message, correlation_id: nil)
   = { correlation_id: correlation_id, operation: :publish }
  @middleware_pipeline.execute(topic, message, ) do
    SharedBroker::Validation.validate!(topic, message)
    encrypted_msg = SharedBroker::Cipher.encrypt(message, SharedBroker.encryption_key)

    @circuit_breaker.run do
      resolve_adapter(topic).publish(topic, encrypted_msg, correlation_id: correlation_id)
    end
  end
end

#subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/shared_broker.rb', line 49

def subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block)
  resolve_adapter(topic).subscribe(topic, queue_name, max_retries: max_retries, backoff_base: backoff_base) do |raw_message|
    decrypted_msg = SharedBroker::Cipher.decrypt(raw_message, SharedBroker.encryption_key)
    SharedBroker::Validation.validate!(topic, decrypted_msg)

     = { correlation_id: decrypted_msg[:_correlation_id], operation: :subscribe, queue_name: queue_name }
    @middleware_pipeline.execute(topic, decrypted_msg, ) do
      block.call(decrypted_msg)
    end
  end
end