Class: Rubyists::Leopard::NatsJetstreamConsumer
- Inherits:
-
Object
- Object
- Rubyists::Leopard::NatsJetstreamConsumer
- Defined in:
- lib/leopard/nats_jetstream_consumer.rb
Overview
Coordinates JetStream pull subscriptions and dispatches fetched messages through Leopard.
Constant Summary collapse
- PROTECTED_CONSUMER_KEYS =
Consumer configuration keys Leopard owns and will not allow endpoint overrides to replace.
%i[durable_name filter_subject ack_policy].freeze
Instance Attribute Summary collapse
-
#subscriptions ⇒ Array<Object>
readonly
Active JetStream pull subscriptions.
-
#threads ⇒ Object
readonly
Returns the value of attribute threads.
Instance Method Summary collapse
-
#build_subscription(endpoint) ⇒ Object
private
Ensures the durable consumer exists and creates a pull subscription for it.
-
#consume_batch(subscription, endpoint) ⇒ void
private
Fetches one batch from JetStream and processes each message through Leopard.
-
#consume_endpoint(subscription, endpoint) ⇒ void
private
Repeatedly fetches and processes batches for one endpoint while the consumer is running.
-
#consumer_config(endpoint) ⇒ Hash
private
Builds the JetStream consumer configuration for an endpoint.
-
#ensure_consumer(endpoint) ⇒ Object
private
Verifies that the durable consumer exists, creating it when missing.
-
#fetch_messages(subscription, endpoint) ⇒ Array<Object>
private
Fetches a batch of messages for one endpoint.
-
#initialize(jetstream:, endpoints:, logger:, process_message:, **dependencies) ⇒ void
constructor
Builds a pull-consumer coordinator for one Leopard worker.
-
#log_loop_error(endpoint, error) ⇒ void
private
Logs an endpoint-level loop failure.
-
#normalized_consumer_options(endpoint) ⇒ Hash
private
Normalizes optional consumer overrides into a hash.
-
#safe_consumer_options(endpoint) ⇒ Hash
private
Removes Leopard-managed consumer keys from user overrides.
-
#start ⇒ void
Starts one pull-consumer loop per configured endpoint.
-
#start_endpoint(endpoint) ⇒ void
private
Starts a consumer loop for one endpoint.
-
#stop ⇒ void
Stops all pull-consumer loops and waits for them to exit.
Constructor Details
#initialize(jetstream:, endpoints:, logger:, process_message:, **dependencies) ⇒ void
Builds a pull-consumer coordinator for one Leopard worker.
32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/leopard/nats_jetstream_consumer.rb', line 32 def initialize(jetstream:, endpoints:, logger:, process_message:, **dependencies) @jetstream = jetstream @endpoints = endpoints @logger = logger @process_message = @callbacks = dependencies.fetch(:callback_builder, NatsJetstreamCallbacks).new(logger:) @thread_factory = dependencies.fetch(:thread_factory, Thread) @subscriptions = [] @threads = [] @running = false end |
Instance Attribute Details
#subscriptions ⇒ Array<Object> (readonly)
Returns Active JetStream pull subscriptions.
19 20 21 |
# File 'lib/leopard/nats_jetstream_consumer.rb', line 19 def subscriptions @subscriptions end |
#threads ⇒ Object (readonly)
Returns the value of attribute threads.
19 |
# File 'lib/leopard/nats_jetstream_consumer.rb', line 19 attr_reader :subscriptions, :threads |
Instance Method Details
#build_subscription(endpoint) ⇒ Object (private)
Ensures the durable consumer exists and creates a pull subscription for it.
79 80 81 82 83 84 85 86 |
# File 'lib/leopard/nats_jetstream_consumer.rb', line 79 def build_subscription(endpoint) ensure_consumer(endpoint) @jetstream.pull_subscribe( endpoint.subject, endpoint.durable, stream: endpoint.stream, ) end |
#consume_batch(subscription, endpoint) ⇒ void (private)
This method returns an undefined value.
Fetches one batch from JetStream and processes each message through Leopard.
159 160 161 162 163 |
# File 'lib/leopard/nats_jetstream_consumer.rb', line 159 def consume_batch(subscription, endpoint) (subscription, endpoint).each do |raw_msg| @process_message.call(raw_msg, endpoint.handler, @callbacks.callbacks_for(endpoint)) end end |
#consume_endpoint(subscription, endpoint) ⇒ void (private)
This method returns an undefined value.
Repeatedly fetches and processes batches for one endpoint while the consumer is running.
140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/leopard/nats_jetstream_consumer.rb', line 140 def consume_endpoint(subscription, endpoint) while @running begin consume_batch(subscription, endpoint) rescue NATS::Timeout next if @running rescue StandardError => e log_loop_error(endpoint, e) break unless @running end end end |
#consumer_config(endpoint) ⇒ Hash (private)
Builds the JetStream consumer configuration for an endpoint.
104 105 106 107 108 109 110 111 |
# File 'lib/leopard/nats_jetstream_consumer.rb', line 104 def consumer_config(endpoint) base = { durable_name: endpoint.durable, filter_subject: endpoint.subject, ack_policy: 'explicit', } base.merge((endpoint)) end |
#ensure_consumer(endpoint) ⇒ Object (private)
Verifies that the durable consumer exists, creating it when missing.
93 94 95 96 97 |
# File 'lib/leopard/nats_jetstream_consumer.rb', line 93 def ensure_consumer(endpoint) @jetstream.consumer_info(endpoint.stream, endpoint.durable) rescue NATS::JetStream::Error::NotFound @jetstream.add_consumer(endpoint.stream, consumer_config(endpoint)) end |
#fetch_messages(subscription, endpoint) ⇒ Array<Object> (private)
Fetches a batch of messages for one endpoint.
171 172 173 |
# File 'lib/leopard/nats_jetstream_consumer.rb', line 171 def (subscription, endpoint) subscription.fetch(endpoint.batch, timeout: endpoint.fetch_timeout) end |
#log_loop_error(endpoint, error) ⇒ void (private)
This method returns an undefined value.
Logs an endpoint-level loop failure.
181 182 183 |
# File 'lib/leopard/nats_jetstream_consumer.rb', line 181 def log_loop_error(endpoint, error) @logger.error "JetStream endpoint #{endpoint.name} loop error: ", error end |
#normalized_consumer_options(endpoint) ⇒ Hash (private)
Normalizes optional consumer overrides into a hash.
118 119 120 121 122 123 |
# File 'lib/leopard/nats_jetstream_consumer.rb', line 118 def (endpoint) return {} unless endpoint.consumer return endpoint.consumer.to_h if endpoint.consumer.respond_to?(:to_h) endpoint.consumer end |
#safe_consumer_options(endpoint) ⇒ Hash (private)
Removes Leopard-managed consumer keys from user overrides.
130 131 132 |
# File 'lib/leopard/nats_jetstream_consumer.rb', line 130 def (endpoint) (endpoint).reject { |key, _value| PROTECTED_CONSUMER_KEYS.include?(key.to_sym) } end |
#start ⇒ void
This method returns an undefined value.
Starts one pull-consumer loop per configured endpoint.
47 48 49 50 |
# File 'lib/leopard/nats_jetstream_consumer.rb', line 47 def start @running = true @endpoints.each { |endpoint| start_endpoint(endpoint) } end |
#start_endpoint(endpoint) ⇒ void (private)
This method returns an undefined value.
Starts a consumer loop for one endpoint.
68 69 70 71 72 |
# File 'lib/leopard/nats_jetstream_consumer.rb', line 68 def start_endpoint(endpoint) subscription = build_subscription(endpoint) subscriptions << subscription threads << @thread_factory.new { consume_endpoint(subscription, endpoint) } end |
#stop ⇒ void
This method returns an undefined value.
Stops all pull-consumer loops and waits for them to exit.
55 56 57 58 59 |
# File 'lib/leopard/nats_jetstream_consumer.rb', line 55 def stop @running = false subscriptions.each(&:unsubscribe) threads.each(&:join) end |