Class: SharedBroker::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/shared_broker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(adapter: nil, adapters: nil, routing: nil, circuit_breaker: nil, middlewares: nil, key_provider: nil) ⇒ Client

Returns a new instance of Client.



65
66
67
68
69
70
71
72
73
74
# File 'lib/shared_broker.rb', line 65

def initialize(adapter: nil, adapters: nil, routing: nil, circuit_breaker: nil, middlewares: nil, key_provider: nil)
  setup_adapters(adapter: adapter, adapters: adapters, routing: routing)
  @circuit_breaker = circuit_breaker || CircuitBreaker.new
  resolved_middlewares = middlewares || [SharedBroker::Middlewares::OpenTelemetryPropagation.new]
  @middleware_pipeline = MiddlewarePipeline.new(resolved_middlewares)
  @key_provider = key_provider
  @running_threads = []
  @running_threads_mutex = Mutex.new
  SharedBroker.register_client(self)
end

Instance Attribute Details

#adaptersObject (readonly)

Returns the value of attribute adapters.



63
64
65
# File 'lib/shared_broker.rb', line 63

def adapters
  @adapters
end

#circuit_breakerObject (readonly)

Returns the value of attribute circuit_breaker.



63
64
65
# File 'lib/shared_broker.rb', line 63

def circuit_breaker
  @circuit_breaker
end

#middleware_pipelineObject (readonly)

Returns the value of attribute middleware_pipeline.



63
64
65
# File 'lib/shared_broker.rb', line 63

def middleware_pipeline
  @middleware_pipeline
end

#routingObject (readonly)

Returns the value of attribute routing.



63
64
65
# File 'lib/shared_broker.rb', line 63

def routing
  @routing
end

Instance Method Details

#active_threadsObject



82
83
84
85
86
# File 'lib/shared_broker.rb', line 82

def active_threads
  @running_threads_mutex.synchronize do
    @running_threads.select(&:alive?)
  end
end

#publish(topic, message, correlation_id: nil) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
# File 'lib/shared_broker.rb', line 88

def publish(topic, message, correlation_id: nil)
   = { correlation_id: correlation_id, operation: :publish }
  @middleware_pipeline.execute(topic, message, ) do
    SharedBroker::Validation.validate!(topic, message)
    encrypted_msg = SharedBroker::Cipher.encrypt(message, active_key_provider, topic: topic)

    @circuit_breaker.run do
      resolve_adapter(topic).publish(topic, encrypted_msg, correlation_id: correlation_id)
    end
  end
end

#publish_batch(topic, messages, correlation_id: nil) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/shared_broker.rb', line 100

def publish_batch(topic, messages, correlation_id: nil)
  unless messages.is_a?(Array)
    raise ArgumentError, "Expected messages to be an Array, got #{messages.class} with value #{messages.inspect}"
  end

  processed_messages = messages.map do |message|
    SharedBroker::Validation.validate!(topic, message)
    SharedBroker::Cipher.encrypt(message, active_key_provider, topic: topic)
  end

   = { correlation_id: correlation_id, operation: :publish_batch }
  @middleware_pipeline.execute(topic, processed_messages, ) do
    @circuit_breaker.run do
      resolve_adapter(topic).publish_batch(topic, processed_messages, correlation_id: correlation_id)
    end
  end
end

#register_thread(thread) ⇒ Object



76
77
78
79
80
# File 'lib/shared_broker.rb', line 76

def register_thread(thread)
  @running_threads_mutex.synchronize do
    @running_threads << thread
  end
end

#subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, max_concurrency: nil, backpressure_check: nil, backpressure_backoff: 1.0, &block) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/shared_broker.rb', line 118

def subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, max_concurrency: nil, backpressure_check: nil, backpressure_backoff: 1.0, &block)
  limiter = SharedBroker::Concurrency::Limiter.new(
    max_concurrency: max_concurrency,
    backpressure_check: backpressure_check,
    backpressure_backoff: backpressure_backoff
  )

  res = resolve_adapter(topic).subscribe(topic, queue_name, max_retries: max_retries, backoff_base: backoff_base) do |raw_message|
    raise SharedBroker::ShutdownError, "Shutdown requested" if SharedBroker.shutdown_requested

    limiter.run do
      raise SharedBroker::ShutdownError, "Shutdown requested" if SharedBroker.shutdown_requested

      decrypted_msg = SharedBroker::Cipher.decrypt(raw_message, active_key_provider, topic: topic)
      SharedBroker::Validation.validate!(topic, decrypted_msg)

       = { correlation_id: decrypted_msg[:_correlation_id], operation: :subscribe, queue_name: queue_name }
      @middleware_pipeline.execute(topic, decrypted_msg, ) do
        block.call(decrypted_msg)
      end
    end
  end

  register_thread(res) if res.is_a?(Thread)
  res
end