Module: Rubyists::Leopard::NatsApiServer::WorkerLifecycle
- Defined in:
- lib/leopard/nats_api_server.rb
Overview
Instance-side worker boot and shutdown helpers.
Instance Method Summary collapse
-
#add_endpoints(endpoints, group_map) ⇒ void
private
Adds endpoints to the NATS service.
-
#add_groups(gps) ⇒ Hash
private
Adds groups to the NATS service.
-
#build_group(defs, cache, name) ⇒ NATS::Group
private
Builds a group in the NATS service.
-
#build_service_opts(service_opts:) ⇒ Hash
private
Builds the service options for the NATS service.
-
#connect_client(nats_url) ⇒ Object
private
Opens the NATS client connection for this worker.
-
#initialize_service(service_opts) ⇒ Object
private
Registers the NATS service for this worker.
-
#initialize_worker_state ⇒ Thread
private
Captures the current thread for later wakeup during shutdown.
-
#jetstream_consumer_class ⇒ Class
private
Returns the JetStream consumer coordinator class for this worker.
-
#logger ⇒ Object
Returns the logger configured for the NATS API server.
-
#setup_worker(nats_url: 'nats://localhost:4222', service_opts: {}) ⇒ void
Sets up a worker thread for the NATS API server.
-
#setup_worker!(nats_url: 'nats://localhost:4222', service_opts: {}) ⇒ void
Sets up a worker thread for the NATS API server and blocks the current thread.
-
#start_jetstream_consumer(endpoints) ⇒ void
private
Starts the JetStream consumer coordinator when JetStream endpoints are present.
-
#stop ⇒ void
Stops the NATS API server worker.
-
#stop_jetstream ⇒ void
private
Stops the JetStream consumer coordinator if one was started.
-
#stop_service ⇒ void
private
Stops the registered NATS service and closes the client connection.
-
#thread_factory ⇒ Class
private
Returns the thread factory used for JetStream consumer loops.
-
#wake_worker ⇒ Thread?
private
Wakes the worker thread if it is blocked.
Instance Method Details
#add_endpoints(endpoints, group_map) ⇒ void (private)
This method returns an undefined value.
Adds endpoints to the NATS service.
427 428 429 430 431 432 433 434 435 |
# File 'lib/leopard/nats_api_server.rb', line 427 def add_endpoints(endpoints, group_map) endpoints.each do |ep| grp = ep.group parent = grp ? group_map[grp] : @service raise ArgumentError, "Group #{grp} not defined" if grp && parent.nil? build_endpoint(parent, ep) end end |
#add_groups(gps) ⇒ Hash (private)
Adds groups to the NATS service.
396 397 398 399 400 |
# File 'lib/leopard/nats_api_server.rb', line 396 def add_groups(gps) created = {} gps.each_key { |name| build_group(gps, created, name) } created end |
#build_group(defs, cache, name) ⇒ NATS::Group (private)
Builds a group in the NATS service.
410 411 412 413 414 415 416 417 418 |
# File 'lib/leopard/nats_api_server.rb', line 410 def build_group(defs, cache, name) return cache[name] if cache.key?(name) gdef = defs[name] raise ArgumentError, "Group #{name} not defined" unless gdef parent = gdef[:parent] ? build_group(defs, cache, gdef[:parent]) : @service cache[name] = parent.groups.add(gdef[:name], queue: gdef[:queue]) end |
#build_service_opts(service_opts:) ⇒ Hash (private)
Builds the service options for the NATS service.
384 385 386 387 388 389 |
# File 'lib/leopard/nats_api_server.rb', line 384 def build_service_opts(service_opts:) { name: self.class.name.split('::').join('.'), version: '0.1.0', }.merge(service_opts) end |
#connect_client(nats_url) ⇒ Object (private)
Opens the NATS client connection for this worker.
312 313 314 |
# File 'lib/leopard/nats_api_server.rb', line 312 def connect_client(nats_url) @client = NATS.connect(nats_url) end |
#initialize_service(service_opts) ⇒ Object (private)
Registers the NATS service for this worker.
321 322 323 |
# File 'lib/leopard/nats_api_server.rb', line 321 def initialize_service(service_opts) @service = @client.services.add(build_service_opts(service_opts:)) end |
#initialize_worker_state ⇒ Thread (private)
Captures the current thread for later wakeup during shutdown.
303 304 305 |
# File 'lib/leopard/nats_api_server.rb', line 303 def initialize_worker_state @thread = Thread.current end |
#jetstream_consumer_class ⇒ Class (private)
Returns the JetStream consumer coordinator class for this worker.
368 369 370 |
# File 'lib/leopard/nats_api_server.rb', line 368 def jetstream_consumer_class NatsJetstreamConsumer end |
#logger ⇒ Object
Returns the logger configured for the NATS API server.
257 |
# File 'lib/leopard/nats_api_server.rb', line 257 def logger = self.class.logger |
#setup_worker(nats_url: 'nats://localhost:4222', service_opts: {}) ⇒ void
This method returns an undefined value.
Sets up a worker thread for the NATS API server. This method connects to the NATS server, adds the service, groups, and endpoints,
266 267 268 269 270 271 272 |
# File 'lib/leopard/nats_api_server.rb', line 266 def setup_worker(nats_url: 'nats://localhost:4222', service_opts: {}) initialize_worker_state connect_client(nats_url) initialize_service(service_opts) add_endpoints(self.class.endpoints.dup, add_groups(self.class.groups.dup)) start_jetstream_consumer(self.class.jetstream_endpoints.dup) end |
#setup_worker!(nats_url: 'nats://localhost:4222', service_opts: {}) ⇒ void
This method returns an undefined value.
Sets up a worker thread for the NATS API server and blocks the current thread.
281 282 283 284 |
# File 'lib/leopard/nats_api_server.rb', line 281 def setup_worker!(nats_url: 'nats://localhost:4222', service_opts: {}) setup_worker(nats_url:, service_opts:) sleep end |
#start_jetstream_consumer(endpoints) ⇒ void (private)
This method returns an undefined value.
Starts the JetStream consumer coordinator when JetStream endpoints are present.
330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/leopard/nats_api_server.rb', line 330 def start_jetstream_consumer(endpoints) return if endpoints.empty? @jetstream_consumer = jetstream_consumer_class.new( jetstream: @client.jetstream, endpoints:, logger:, process_message: method(:process_transport_message), thread_factory:, ) @jetstream_consumer.start end |
#stop ⇒ void
This method returns an undefined value.
Stops the NATS API server worker.
289 290 291 292 293 294 295 296 |
# File 'lib/leopard/nats_api_server.rb', line 289 def stop @running = false stop_jetstream stop_service wake_worker rescue ThreadError nil end |
#stop_jetstream ⇒ void (private)
This method returns an undefined value.
Stops the JetStream consumer coordinator if one was started.
346 347 348 |
# File 'lib/leopard/nats_api_server.rb', line 346 def stop_jetstream @jetstream_consumer&.stop end |
#stop_service ⇒ void (private)
This method returns an undefined value.
Stops the registered NATS service and closes the client connection.
353 354 355 356 |
# File 'lib/leopard/nats_api_server.rb', line 353 def stop_service @service&.stop @client&.close end |
#thread_factory ⇒ Class (private)
Returns the thread factory used for JetStream consumer loops.
375 376 377 |
# File 'lib/leopard/nats_api_server.rb', line 375 def thread_factory Thread end |
#wake_worker ⇒ Thread? (private)
Wakes the worker thread if it is blocked.
361 362 363 |
# File 'lib/leopard/nats_api_server.rb', line 361 def wake_worker @thread&.wakeup end |