Class: Rubyists::Leopard::NatsJetstreamConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/leopard/nats_jetstream_consumer.rb

Overview

Coordinates JetStream pull subscriptions and dispatches fetched messages through Leopard.

Constant Summary collapse

PROTECTED_CONSUMER_KEYS =

Consumer configuration keys Leopard owns and will not allow endpoint overrides to replace.

%i[durable_name filter_subject ack_policy].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(jetstream:, endpoints:, logger:, process_message:, **dependencies) ⇒ void

Builds a pull-consumer coordinator for one Leopard worker.

Parameters:

  • jetstream (Object)

    JetStream client used to manage consumers and subscriptions.

  • endpoints (Array<NatsJetstreamEndpoint>)

    JetStream endpoint definitions for this worker.

  • logger (#error)

    Logger used for loop failures.

  • process_message (#call)

    Callable that processes a raw JetStream message through Leopard.

  • dependencies (Hash{Symbol => Object})

    Optional collaborators for callback and thread creation.

Options Hash (**dependencies):

  • :callback_builder (Class) — default: NatsJetstreamCallbacks

    Builder for transport callbacks.

  • :thread_factory (Class) — default: Thread

    Thread-like factory used to spawn consumer loops.



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/leopard/nats_jetstream_consumer.rb', line 32

def initialize(jetstream:, endpoints:, logger:, process_message:, **dependencies)
  @jetstream = jetstream
  @endpoints = endpoints
  @logger = logger
  @process_message = process_message
  @callbacks = dependencies.fetch(:callback_builder, NatsJetstreamCallbacks).new(logger:)
  @thread_factory = dependencies.fetch(:thread_factory, Thread)
  @subscriptions = []
  @threads = []
  @running = false
end

Instance Attribute Details

#subscriptionsArray<Object> (readonly)

Returns Active JetStream pull subscriptions.

Returns:

  • (Array<Object>)

    Active JetStream pull subscriptions.



19
20
21
# File 'lib/leopard/nats_jetstream_consumer.rb', line 19

def subscriptions
  @subscriptions
end

#threadsObject (readonly)

Returns the value of attribute threads.



19
# File 'lib/leopard/nats_jetstream_consumer.rb', line 19

attr_reader :subscriptions, :threads

Instance Method Details

#build_subscription(endpoint) ⇒ Object (private)

Ensures the durable consumer exists and creates a pull subscription for it.

Parameters:

Returns:

  • (Object)

    The JetStream pull subscription.



79
80
81
82
83
84
85
86
# File 'lib/leopard/nats_jetstream_consumer.rb', line 79

def build_subscription(endpoint)
  ensure_consumer(endpoint)
  @jetstream.pull_subscribe(
    endpoint.subject,
    endpoint.durable,
    stream: endpoint.stream,
  )
end

#consume_batch(subscription, endpoint) ⇒ void (private)

This method returns an undefined value.

Fetches one batch from JetStream and processes each message through Leopard.

Parameters:

  • subscription (Object)

    Pull subscription for the endpoint.

  • endpoint (NatsJetstreamEndpoint)

    The endpoint configuration being consumed.



159
160
161
162
163
# File 'lib/leopard/nats_jetstream_consumer.rb', line 159

def consume_batch(subscription, endpoint)
  fetch_messages(subscription, endpoint).each do |raw_msg|
    @process_message.call(raw_msg, endpoint.handler, @callbacks.callbacks_for(endpoint))
  end
end

#consume_endpoint(subscription, endpoint) ⇒ void (private)

This method returns an undefined value.

Repeatedly fetches and processes batches for one endpoint while the consumer is running.

Parameters:

  • subscription (Object)

    Pull subscription for the endpoint.

  • endpoint (NatsJetstreamEndpoint)

    The endpoint configuration being consumed.



140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/leopard/nats_jetstream_consumer.rb', line 140

def consume_endpoint(subscription, endpoint)
  while @running
    begin
      consume_batch(subscription, endpoint)
    rescue NATS::Timeout
      next if @running
    rescue StandardError => e
      log_loop_error(endpoint, e)
      break unless @running
    end
  end
end

#consumer_config(endpoint) ⇒ Hash (private)

Builds the JetStream consumer configuration for an endpoint.

Parameters:

Returns:

  • (Hash)

    Consumer configuration accepted by ‘add_consumer`.



104
105
106
107
108
109
110
111
# File 'lib/leopard/nats_jetstream_consumer.rb', line 104

def consumer_config(endpoint)
  base = {
    durable_name: endpoint.durable,
    filter_subject: endpoint.subject,
    ack_policy: 'explicit',
  }
  base.merge(safe_consumer_options(endpoint))
end

#ensure_consumer(endpoint) ⇒ Object (private)

Verifies that the durable consumer exists, creating it when missing.

Parameters:

Returns:

  • (Object)

    Consumer metadata from ‘consumer_info` or `add_consumer`.



93
94
95
96
97
# File 'lib/leopard/nats_jetstream_consumer.rb', line 93

def ensure_consumer(endpoint)
  @jetstream.consumer_info(endpoint.stream, endpoint.durable)
rescue NATS::JetStream::Error::NotFound
  @jetstream.add_consumer(endpoint.stream, consumer_config(endpoint))
end

#fetch_messages(subscription, endpoint) ⇒ Array<Object> (private)

Fetches a batch of messages for one endpoint.

Parameters:

  • subscription (Object)

    Pull subscription for the endpoint.

  • endpoint (NatsJetstreamEndpoint)

    The endpoint configuration being consumed.

Returns:

  • (Array<Object>)

    Raw JetStream messages returned by the subscription.



171
172
173
# File 'lib/leopard/nats_jetstream_consumer.rb', line 171

def fetch_messages(subscription, endpoint)
  subscription.fetch(endpoint.batch, timeout: endpoint.fetch_timeout)
end

#log_loop_error(endpoint, error) ⇒ void (private)

This method returns an undefined value.

Logs an endpoint-level loop failure.

Parameters:

  • endpoint (NatsJetstreamEndpoint)

    The endpoint whose loop failed.

  • error (StandardError)

    The raised exception.



181
182
183
# File 'lib/leopard/nats_jetstream_consumer.rb', line 181

def log_loop_error(endpoint, error)
  @logger.error "JetStream endpoint #{endpoint.name} loop error: ", error
end

#normalized_consumer_options(endpoint) ⇒ Hash (private)

Normalizes optional consumer overrides into a hash.

Parameters:

Returns:

  • (Hash)

    Consumer overrides, or an empty hash when none were provided.



118
119
120
121
122
123
# File 'lib/leopard/nats_jetstream_consumer.rb', line 118

def normalized_consumer_options(endpoint)
  return {} unless endpoint.consumer
  return endpoint.consumer.to_h if endpoint.consumer.respond_to?(:to_h)

  endpoint.consumer
end

#safe_consumer_options(endpoint) ⇒ Hash (private)

Removes Leopard-managed consumer keys from user overrides.

Parameters:

Returns:

  • (Hash)

    Consumer overrides excluding protected keys required by Leopard.



130
131
132
# File 'lib/leopard/nats_jetstream_consumer.rb', line 130

def safe_consumer_options(endpoint)
  normalized_consumer_options(endpoint).reject { |key, _value| PROTECTED_CONSUMER_KEYS.include?(key.to_sym) }
end

#startvoid

This method returns an undefined value.

Starts one pull-consumer loop per configured endpoint.



47
48
49
50
# File 'lib/leopard/nats_jetstream_consumer.rb', line 47

def start
  @running = true
  @endpoints.each { |endpoint| start_endpoint(endpoint) }
end

#start_endpoint(endpoint) ⇒ void (private)

This method returns an undefined value.

Starts a consumer loop for one endpoint.

Parameters:



68
69
70
71
72
# File 'lib/leopard/nats_jetstream_consumer.rb', line 68

def start_endpoint(endpoint)
  subscription = build_subscription(endpoint)
  subscriptions << subscription
  threads << @thread_factory.new { consume_endpoint(subscription, endpoint) }
end

#stopvoid

This method returns an undefined value.

Stops all pull-consumer loops and waits for them to exit.



55
56
57
58
59
# File 'lib/leopard/nats_jetstream_consumer.rb', line 55

def stop
  @running = false
  subscriptions.each(&:unsubscribe)
  threads.each(&:join)
end