Module: Rubyists::Leopard::NatsApiServer::ClassMethods

Includes:
MetricsServer
Defined in:
lib/leopard/nats_api_server.rb

Overview

Class-level DSL for defining Leopard endpoints, middleware, and worker startup.

Instance Method Summary collapse

Methods included from MetricsServer

#accumulate_worker_metrics, #close_client, #collect_prometheus_metrics, #handle_metrics_client, #metrics_template_path, #prometheus_metrics, #render_metrics_template, #start_metrics_server, #write_metrics_response

Instance Method Details

#build_jetstream_endpoint(name, options, handler) ⇒ NatsJetstreamEndpoint (private)

Builds a JetStream endpoint struct with Leopard defaults applied.

Parameters:

  • name (String, Symbol)

    Endpoint name.

  • options (Hash)

    JetStream endpoint options.

  • handler (Proc)

    Endpoint handler block.

Returns:



239
240
241
242
243
244
245
246
247
248
249
# File 'lib/leopard/nats_api_server.rb', line 239

def build_jetstream_endpoint(name, options, handler)
  NatsJetstreamEndpoint.new(
    name:,
    handler:,
    consumer: nil,
    batch: 1,
    fetch_timeout: 5,
    nak_delay: nil,
    **options,
  )
end

#build_worker(nats_url, service_opts, workers, blocking) ⇒ void (private)

This method returns an undefined value.

Builds a worker instance and sets it up with the NATS server.

Parameters:

  • nats_url (String)

    The URL of the NATS server.

  • service_opts (Hash)

    Options for the NATS service.

  • workers (Array)

    The array to store worker instances.

  • blocking (Boolean)

    If true, blocks the current thread until the worker is set up.



172
173
174
175
176
177
178
179
# File 'lib/leopard/nats_api_server.rb', line 172

def build_worker(nats_url, service_opts, workers, blocking)
  worker = @instance_args ? new(**@instance_args) : new
  workers << worker
  args = { nats_url:, service_opts: }
  return worker.setup_worker!(**args) if blocking

  worker.setup_worker(**args)
end

#endpoint(name, subject: nil, queue: nil, group: nil, &handler) {|wrapper| ... } ⇒ void

This method returns an undefined value.

Define an endpoint for the NATS API server.

Parameters:

  • name (String)

    The name of the endpoint.

  • subject (String, nil) (defaults to: nil)

    The NATS subject to listen on. Defaults to the endpoint name.

  • queue (String, nil) (defaults to: nil)

    The NATS queue group to use. Defaults to nil.

  • group (String, nil) (defaults to: nil)

    The group this endpoint belongs to. Defaults to nil.

  • handler (Proc)

    The block that will handle incoming messages.

Yields:

  • (wrapper)

    Handles the wrapped request message.

Yield Parameters:

Yield Returns:

  • (Dry::Monads::Result)

    The handler result.



72
73
74
# File 'lib/leopard/nats_api_server.rb', line 72

def endpoint(name, subject: nil, queue: nil, group: nil, &handler)
  endpoints << Endpoint.new(name:, subject: subject || name, queue:, group:, handler:)
end

#endpointsArray<Endpoint>

Returns the configured request/reply endpoints for the service class.

Returns:

  • (Array<Endpoint>)

    Declared request/reply endpoints.



46
47
48
49
# File 'lib/leopard/nats_api_server.rb', line 46

def endpoints = @endpoints ||= []
# Returns the configured JetStream endpoints for the service class.
#
# @return [Array<NatsJetstreamEndpoint>] Declared JetStream pull-consumer endpoints.

#group(name, group: nil, queue: nil) ⇒ void

This method returns an undefined value.

Define a group for organizing endpoints.

Parameters:

  • name (String)

    The name of the group.

  • group (String, nil) (defaults to: nil)

    The parent group this group belongs to. Defaults to nil.

  • queue (String, nil) (defaults to: nil)

    The NATS queue group to use for this group. Defaults to nil.



104
105
106
# File 'lib/leopard/nats_api_server.rb', line 104

def group(name, group: nil, queue: nil)
  groups[name] = { name:, parent: group, queue: }
end

#groupsHash{Symbol,String => Hash}

Returns the configured endpoint groups for the service class.

Returns:

  • (Hash{Symbol,String => Hash})

    Declared group definitions.



54
55
56
57
# File 'lib/leopard/nats_api_server.rb', line 54

def groups = @groups ||= {}
# Returns the configured middleware stack for the service class.
#
# @return [Array<Array>] Middleware declarations in registration order.

#jetstream_endpoint(name, **options, &handler) {|wrapper| ... } ⇒ void

This method returns an undefined value.

Define a JetStream pull consumer endpoint.

Parameters:

  • name (String)

    The name of the endpoint.

  • options (Hash)

    JetStream endpoint configuration.

  • handler (Proc)

    The block that will handle incoming messages.

Options Hash (**options):

  • :stream (String)

    The JetStream stream name.

  • :subject (String)

    The JetStream subject filter.

  • :durable (String)

    The durable consumer name.

  • :consumer (Hash, NATS::JetStream::API::ConsumerConfig, nil)

    Optional consumer config.

  • :batch (Integer) — default: 1

    Number of messages to fetch per pull request.

  • :fetch_timeout (Numeric) — default: 5

    Maximum time to wait for fetched messages.

  • :nak_delay (Numeric, nil)

    Optional delayed redelivery value for ‘nak`.

Yields:

  • (wrapper)

    Handles the wrapped JetStream message.

Yield Parameters:

Yield Returns:

  • (Dry::Monads::Result)

    The handler result.



93
94
95
# File 'lib/leopard/nats_api_server.rb', line 93

def jetstream_endpoint(name, **options, &handler)
  jetstream_endpoints << build_jetstream_endpoint(name, options, handler)
end

#jetstream_endpointsArray<NatsJetstreamEndpoint>

Returns the configured JetStream endpoints for the service class.

Returns:



50
51
52
53
# File 'lib/leopard/nats_api_server.rb', line 50

def jetstream_endpoints = @jetstream_endpoints ||= []
# Returns the configured endpoint groups for the service class.
#
# @return [Hash{Symbol,String => Hash}] Declared group definitions.

#middlewareArray<Array>

Returns the configured middleware stack for the service class.

Returns:

  • (Array<Array>)

    Middleware declarations in registration order.



58
# File 'lib/leopard/nats_api_server.rb', line 58

def middleware = @middleware ||= []

#run(nats_url:, service_opts:, instances: 1, blocking: true) ⇒ Concurrent::FixedThreadPool, void

Start the NATS API server. This method connects to the NATS server and spawns multiple instances of the API server.

Parameters:

  • nats_url (String)

    The URL of the NATS server to connect to.

  • service_opts (Hash)

    Options for the NATS service.

  • instances (Integer) (defaults to: 1)

    The number of instances to spawn. Defaults to 1.

  • blocking (Boolean) (defaults to: true)

    If false, does not block current thread after starting the server. Defaults to true.

Returns:

  • (Concurrent::FixedThreadPool, void)

    The worker pool for non-blocking runs, otherwise blocks forever.



128
129
130
131
132
133
134
135
136
137
138
# File 'lib/leopard/nats_api_server.rb', line 128

def run(nats_url:, service_opts:, instances: 1, blocking: true)
  logger.info 'Booting NATS API server...'
  workers = Concurrent::Array.new
  pool = spawn_instances(nats_url, service_opts, instances, workers, blocking)
  logger.info 'Setting up signal trap...'
  trap_signals(workers, pool)
  start_metrics_server(workers) if ENV['LEOPARD_METRICS_PORT']
  return pool unless blocking

  sleep
end

#shutdown(workers, pool) ⇒ Proc (private)

Shuts down the NATS API server gracefully.

Parameters:

  • workers (Array)

    The array of worker instances to stop.

  • pool (Concurrent::FixedThreadPool)

    The thread pool managing the worker threads.

Returns:

  • (Proc)

    A lambda that performs the shutdown operations.



187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/leopard/nats_api_server.rb', line 187

def shutdown(workers, pool)
  lambda do
    logger.warn 'Draining worker subscriptions...'
    workers.each(&:stop)
    logger.warn 'All workers stopped, shutting down pool...'
    pool.shutdown
    logger.warn 'Pool is shut down, waiting for termination!'
    pool.wait_for_termination
    logger.warn 'Bye bye!'
    wake_main_thread_and_exit!
  end
end

#spawn_instances(url, opts, count, workers, blocking) ⇒ Concurrent::FixedThreadPool (private)

Spawns multiple instances of the NATS API server.

Parameters:

  • url (String)

    The URL of the NATS server.

  • opts (Hash)

    Options for the NATS service.

  • count (Integer)

    The number of instances to spawn.

  • workers (Array)

    The array to store worker instances.

  • blocking (Boolean)

    If false, does not block current thread after starting the server.

Returns:

  • (Concurrent::FixedThreadPool)

    The thread pool managing the worker threads.

Raises:

  • (ArgumentError)

    If ‘instance_args` was provided but is not a hash.



152
153
154
155
156
157
158
159
160
161
162
# File 'lib/leopard/nats_api_server.rb', line 152

def spawn_instances(url, opts, count, workers, blocking)
  pool = Concurrent::FixedThreadPool.new(count)
  @instance_args = opts.delete(:instance_args) || nil
  logger.info "Building #{count} workers with options: #{opts.inspect}, instance_args: #{@instance_args}"
  raise ArgumentError, 'instance_args must be a Hash' if @instance_args && !@instance_args.is_a?(Hash)

  count.times do
    pool.post { build_worker(url, opts, workers, blocking) }
  end
  pool
end

#trap_signals(workers, pool) ⇒ void (private)

This method returns an undefined value.

Sets up signal traps for graceful shutdown of the NATS API server.

Parameters:

  • workers (Array)

    The array of worker instances to stop on signal.

  • pool (Concurrent::FixedThreadPool)

    The thread pool managing the worker threads.



206
207
208
209
210
211
212
213
214
215
216
# File 'lib/leopard/nats_api_server.rb', line 206

def trap_signals(workers, pool)
  return if @trapped

  %w[INT TERM QUIT].each do |sig|
    trap(sig) do
      logger.warn "Received #{sig} signal, shutting down..."
      Thread.new { shutdown(workers, pool).call }
    end
  end
  @trapped = true
end

#use(klass, *args, &block) ⇒ void

This method returns an undefined value.

Use a middleware class for processing messages.

Parameters:

  • klass (Class)

    The middleware class to use.

  • args (Array)

    Optional arguments to pass to the middleware class.

  • block (Proc)

    Optional block to pass to the middleware class.



115
116
117
# File 'lib/leopard/nats_api_server.rb', line 115

def use(klass, *args, &block)
  middleware << [klass, args, block]
end

#wake_main_thread_and_exit!void (private)

This method returns an undefined value.

Wakes up the main thread to allow it to continue execution after the server is stopped. This is useful when the server is running in a blocking mode. If the main thread is not blocked, this method does just exits.



223
224
225
226
227
228
229
230
# File 'lib/leopard/nats_api_server.rb', line 223

def wake_main_thread_and_exit!
  Thread.main.wakeup
  exit 0
rescue ThreadError
  exit 0
rescue StandardError
  exit 1
end