Class: SharedBroker::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/shared_broker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(adapter: nil, adapters: nil, routing: nil, circuit_breaker: nil, middlewares: nil) ⇒ Client

Returns a new instance of Client.



32
33
34
35
36
37
# File 'lib/shared_broker.rb', line 32

def initialize(adapter: nil, adapters: nil, routing: nil, circuit_breaker: nil, middlewares: nil)
  setup_adapters(adapter: adapter, adapters: adapters, routing: routing)
  @circuit_breaker = circuit_breaker || CircuitBreaker.new
  resolved_middlewares = middlewares || [SharedBroker::Middlewares::OpenTelemetryPropagation.new]
  @middleware_pipeline = MiddlewarePipeline.new(resolved_middlewares)
end

Instance Attribute Details

#adaptersObject (readonly)

Returns the value of attribute adapters.



30
31
32
# File 'lib/shared_broker.rb', line 30

def adapters
  @adapters
end

#circuit_breakerObject (readonly)

Returns the value of attribute circuit_breaker.



30
31
32
# File 'lib/shared_broker.rb', line 30

def circuit_breaker
  @circuit_breaker
end

#middleware_pipelineObject (readonly)

Returns the value of attribute middleware_pipeline.



30
31
32
# File 'lib/shared_broker.rb', line 30

def middleware_pipeline
  @middleware_pipeline
end

#routingObject (readonly)

Returns the value of attribute routing.



30
31
32
# File 'lib/shared_broker.rb', line 30

def routing
  @routing
end

Instance Method Details

#publish(topic, message, correlation_id: nil) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
# File 'lib/shared_broker.rb', line 39

def publish(topic, message, correlation_id: nil)
   = { correlation_id: correlation_id, operation: :publish }
  @middleware_pipeline.execute(topic, message, ) do
    SharedBroker::Validation.validate!(topic, message)
    encrypted_msg = SharedBroker::Cipher.encrypt(message, SharedBroker.encryption_key)

    @circuit_breaker.run do
      resolve_adapter(topic).publish(topic, encrypted_msg, correlation_id: correlation_id)
    end
  end
end

#subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, max_concurrency: nil, backpressure_check: nil, backpressure_backoff: 1.0, &block) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/shared_broker.rb', line 51

def subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, max_concurrency: nil, backpressure_check: nil, backpressure_backoff: 1.0, &block)
  limiter = SharedBroker::Concurrency::Limiter.new(
    max_concurrency: max_concurrency,
    backpressure_check: backpressure_check,
    backpressure_backoff: backpressure_backoff
  )

  resolve_adapter(topic).subscribe(topic, queue_name, max_retries: max_retries, backoff_base: backoff_base) do |raw_message|
    limiter.run do
      decrypted_msg = SharedBroker::Cipher.decrypt(raw_message, SharedBroker.encryption_key)
      SharedBroker::Validation.validate!(topic, decrypted_msg)

       = { correlation_id: decrypted_msg[:_correlation_id], operation: :subscribe, queue_name: queue_name }
      @middleware_pipeline.execute(topic, decrypted_msg, ) do
        block.call(decrypted_msg)
      end
    end
  end
end