Module: Rubyists::Leopard::NatsApiServer::ClassMethods
- Includes:
- MetricsServer
- Defined in:
- lib/leopard/nats_api_server.rb
Overview
Class-level DSL for defining Leopard endpoints, middleware, and worker startup.
Instance Method Summary collapse
-
#build_jetstream_endpoint(name, options, handler) ⇒ NatsJetstreamEndpoint
private
Builds a JetStream endpoint struct with Leopard defaults applied.
-
#build_worker(nats_url, service_opts, workers, blocking) ⇒ void
private
Builds a worker instance and sets it up with the NATS server.
-
#endpoint(name, subject: nil, queue: nil, group: nil, &handler) {|wrapper| ... } ⇒ void
Define an endpoint for the NATS API server.
-
#endpoints ⇒ Array<Endpoint>
Returns the configured request/reply endpoints for the service class.
-
#group(name, group: nil, queue: nil) ⇒ void
Define a group for organizing endpoints.
-
#groups ⇒ Hash{Symbol,String => Hash}
Returns the configured endpoint groups for the service class.
-
#jetstream_endpoint(name, **options, &handler) {|wrapper| ... } ⇒ void
Define a JetStream pull consumer endpoint.
-
#jetstream_endpoints ⇒ Array<NatsJetstreamEndpoint>
Returns the configured JetStream endpoints for the service class.
-
#middleware ⇒ Array<Array>
Returns the configured middleware stack for the service class.
-
#run(nats_url:, service_opts:, instances: 1, blocking: true) ⇒ Concurrent::FixedThreadPool, void
Start the NATS API server.
-
#shutdown(workers, pool) ⇒ Proc
private
Shuts down the NATS API server gracefully.
-
#spawn_instances(url, opts, count, workers, blocking) ⇒ Concurrent::FixedThreadPool
private
Spawns multiple instances of the NATS API server.
-
#trap_signals(workers, pool) ⇒ void
private
Sets up signal traps for graceful shutdown of the NATS API server.
-
#use(klass, *args, &block) ⇒ void
Use a middleware class for processing messages.
-
#wake_main_thread_and_exit! ⇒ void
private
Wakes up the main thread to allow it to continue execution after the server is stopped.
Methods included from MetricsServer
#accumulate_worker_metrics, #close_client, #collect_prometheus_metrics, #handle_metrics_client, #metrics_template_path, #prometheus_metrics, #render_metrics_template, #start_metrics_server, #write_metrics_response
Instance Method Details
#build_jetstream_endpoint(name, options, handler) ⇒ NatsJetstreamEndpoint (private)
Builds a JetStream endpoint struct with Leopard defaults applied.
239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/leopard/nats_api_server.rb', line 239 def build_jetstream_endpoint(name, , handler) NatsJetstreamEndpoint.new( name:, handler:, consumer: nil, batch: 1, fetch_timeout: 5, nak_delay: nil, **, ) end |
#build_worker(nats_url, service_opts, workers, blocking) ⇒ void (private)
This method returns an undefined value.
Builds a worker instance and sets it up with the NATS server.
172 173 174 175 176 177 178 179 |
# File 'lib/leopard/nats_api_server.rb', line 172 def build_worker(nats_url, service_opts, workers, blocking) worker = @instance_args ? new(**@instance_args) : new workers << worker args = { nats_url:, service_opts: } return worker.setup_worker!(**args) if blocking worker.setup_worker(**args) end |
#endpoint(name, subject: nil, queue: nil, group: nil, &handler) {|wrapper| ... } ⇒ void
This method returns an undefined value.
Define an endpoint for the NATS API server.
72 73 74 |
# File 'lib/leopard/nats_api_server.rb', line 72 def endpoint(name, subject: nil, queue: nil, group: nil, &handler) endpoints << Endpoint.new(name:, subject: subject || name, queue:, group:, handler:) end |
#endpoints ⇒ Array<Endpoint>
Returns the configured request/reply endpoints for the service class.
46 47 48 49 |
# File 'lib/leopard/nats_api_server.rb', line 46 def endpoints = @endpoints ||= [] # Returns the configured JetStream endpoints for the service class. # # @return [Array<NatsJetstreamEndpoint>] Declared JetStream pull-consumer endpoints. |
#group(name, group: nil, queue: nil) ⇒ void
This method returns an undefined value.
Define a group for organizing endpoints.
104 105 106 |
# File 'lib/leopard/nats_api_server.rb', line 104 def group(name, group: nil, queue: nil) groups[name] = { name:, parent: group, queue: } end |
#groups ⇒ Hash{Symbol,String => Hash}
Returns the configured endpoint groups for the service class.
54 55 56 57 |
# File 'lib/leopard/nats_api_server.rb', line 54 def groups = @groups ||= {} # Returns the configured middleware stack for the service class. # # @return [Array<Array>] Middleware declarations in registration order. |
#jetstream_endpoint(name, **options, &handler) {|wrapper| ... } ⇒ void
This method returns an undefined value.
Define a JetStream pull consumer endpoint.
93 94 95 |
# File 'lib/leopard/nats_api_server.rb', line 93 def jetstream_endpoint(name, **, &handler) jetstream_endpoints << build_jetstream_endpoint(name, , handler) end |
#jetstream_endpoints ⇒ Array<NatsJetstreamEndpoint>
Returns the configured JetStream endpoints for the service class.
50 51 52 53 |
# File 'lib/leopard/nats_api_server.rb', line 50 def jetstream_endpoints = @jetstream_endpoints ||= [] # Returns the configured endpoint groups for the service class. # # @return [Hash{Symbol,String => Hash}] Declared group definitions. |
#middleware ⇒ Array<Array>
Returns the configured middleware stack for the service class.
58 |
# File 'lib/leopard/nats_api_server.rb', line 58 def middleware = @middleware ||= [] |
#run(nats_url:, service_opts:, instances: 1, blocking: true) ⇒ Concurrent::FixedThreadPool, void
Start the NATS API server. This method connects to the NATS server and spawns multiple instances of the API server.
128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/leopard/nats_api_server.rb', line 128 def run(nats_url:, service_opts:, instances: 1, blocking: true) logger.info 'Booting NATS API server...' workers = Concurrent::Array.new pool = spawn_instances(nats_url, service_opts, instances, workers, blocking) logger.info 'Setting up signal trap...' trap_signals(workers, pool) start_metrics_server(workers) if ENV['LEOPARD_METRICS_PORT'] return pool unless blocking sleep end |
#shutdown(workers, pool) ⇒ Proc (private)
Shuts down the NATS API server gracefully.
187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/leopard/nats_api_server.rb', line 187 def shutdown(workers, pool) lambda do logger.warn 'Draining worker subscriptions...' workers.each(&:stop) logger.warn 'All workers stopped, shutting down pool...' pool.shutdown logger.warn 'Pool is shut down, waiting for termination!' pool.wait_for_termination logger.warn 'Bye bye!' wake_main_thread_and_exit! end end |
#spawn_instances(url, opts, count, workers, blocking) ⇒ Concurrent::FixedThreadPool (private)
Spawns multiple instances of the NATS API server.
152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/leopard/nats_api_server.rb', line 152 def spawn_instances(url, opts, count, workers, blocking) pool = Concurrent::FixedThreadPool.new(count) @instance_args = opts.delete(:instance_args) || nil logger.info "Building #{count} workers with options: #{opts.inspect}, instance_args: #{@instance_args}" raise ArgumentError, 'instance_args must be a Hash' if @instance_args && !@instance_args.is_a?(Hash) count.times do pool.post { build_worker(url, opts, workers, blocking) } end pool end |
#trap_signals(workers, pool) ⇒ void (private)
This method returns an undefined value.
Sets up signal traps for graceful shutdown of the NATS API server.
206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/leopard/nats_api_server.rb', line 206 def trap_signals(workers, pool) return if @trapped %w[INT TERM QUIT].each do |sig| trap(sig) do logger.warn "Received #{sig} signal, shutting down..." Thread.new { shutdown(workers, pool).call } end end @trapped = true end |
#use(klass, *args, &block) ⇒ void
This method returns an undefined value.
Use a middleware class for processing messages.
115 116 117 |
# File 'lib/leopard/nats_api_server.rb', line 115 def use(klass, *args, &block) middleware << [klass, args, block] end |
#wake_main_thread_and_exit! ⇒ void (private)
This method returns an undefined value.
Wakes up the main thread to allow it to continue execution after the server is stopped. This is useful when the server is running in a blocking mode. If the main thread is not blocked, this method does just exits.
223 224 225 226 227 228 229 230 |
# File 'lib/leopard/nats_api_server.rb', line 223 def wake_main_thread_and_exit! Thread.main.wakeup exit 0 rescue ThreadError exit 0 rescue StandardError exit 1 end |