Module: Rubyists::Leopard::NatsApiServer::WorkerLifecycle

Defined in:
lib/leopard/nats_api_server.rb

Overview

Instance-side worker boot and shutdown helpers.

Instance Method Summary collapse

Instance Method Details

#add_endpoints(endpoints, group_map) ⇒ void (private)

This method returns an undefined value.

Adds endpoints to the NATS service.

Parameters:

  • endpoints (Array<Hash>)

    The list of endpoints to add.

  • group_map (Hash)

    A map of group names to their created group objects.

Raises:

  • (ArgumentError)

    If an endpoint references an undefined group.



427
428
429
430
431
432
433
434
435
# File 'lib/leopard/nats_api_server.rb', line 427

def add_endpoints(endpoints, group_map)
  endpoints.each do |ep|
    grp = ep.group
    parent = grp ? group_map[grp] : @service
    raise ArgumentError, "Group #{grp} not defined" if grp && parent.nil?

    build_endpoint(parent, ep)
  end
end

#add_groups(gps) ⇒ Hash (private)

Adds groups to the NATS service.

Parameters:

  • gps (Hash)

    The groups to add, where keys are group names and values are group definitions.

Returns:

  • (Hash)

    A map of group names to their created group objects.



396
397
398
399
400
# File 'lib/leopard/nats_api_server.rb', line 396

def add_groups(gps)
  created = {}
  gps.each_key { |name| build_group(gps, created, name) }
  created
end

#build_group(defs, cache, name) ⇒ NATS::Group (private)

Builds a group in the NATS service.

Parameters:

  • defs (Hash)

    The group definitions, where keys are group names and values are group definitions.

  • cache (Hash)

    A cache to store already created groups.

  • name (String)

    The name of the group to build.

Returns:

  • (NATS::Group)

    The created group object.

Raises:

  • (ArgumentError)

    If the requested group was never defined.



410
411
412
413
414
415
416
417
418
# File 'lib/leopard/nats_api_server.rb', line 410

def build_group(defs, cache, name)
  return cache[name] if cache.key?(name)

  gdef = defs[name]
  raise ArgumentError, "Group #{name} not defined" unless gdef

  parent = gdef[:parent] ? build_group(defs, cache, gdef[:parent]) : @service
  cache[name] = parent.groups.add(gdef[:name], queue: gdef[:queue])
end

#build_service_opts(service_opts:) ⇒ Hash (private)

Builds the service options for the NATS service.

Parameters:

  • service_opts (Hash)

    Options for the NATS service.

Returns:

  • (Hash)

    The complete service options including name and version.



384
385
386
387
388
389
# File 'lib/leopard/nats_api_server.rb', line 384

def build_service_opts(service_opts:)
  {
    name: self.class.name.split('::').join('.'),
    version: '0.1.0',
  }.merge(service_opts)
end

#connect_client(nats_url) ⇒ Object (private)

Opens the NATS client connection for this worker.

Parameters:

  • nats_url (String)

    The URL of the NATS server.

Returns:

  • (Object)

    The connected NATS client.



312
313
314
# File 'lib/leopard/nats_api_server.rb', line 312

def connect_client(nats_url)
  @client = NATS.connect(nats_url)
end

#initialize_service(service_opts) ⇒ Object (private)

Registers the NATS service for this worker.

Parameters:

  • service_opts (Hash)

    Options for the NATS service.

Returns:

  • (Object)

    The created NATS service.



321
322
323
# File 'lib/leopard/nats_api_server.rb', line 321

def initialize_service(service_opts)
  @service = @client.services.add(build_service_opts(service_opts:))
end

#initialize_worker_stateThread (private)

Captures the current thread for later wakeup during shutdown.

Returns:

  • (Thread)

    The current worker thread.



303
304
305
# File 'lib/leopard/nats_api_server.rb', line 303

def initialize_worker_state
  @thread = Thread.current
end

#jetstream_consumer_classClass (private)

Returns the JetStream consumer coordinator class for this worker.

Returns:

  • (Class)

    The JetStream consumer implementation class.



368
369
370
# File 'lib/leopard/nats_api_server.rb', line 368

def jetstream_consumer_class
  NatsJetstreamConsumer
end

#loggerObject

Returns the logger configured for the NATS API server.

Returns:

  • (Object)

    The configured logger.



257
# File 'lib/leopard/nats_api_server.rb', line 257

def logger = self.class.logger

#setup_worker(nats_url: 'nats://localhost:4222', service_opts: {}) ⇒ void

This method returns an undefined value.

Sets up a worker thread for the NATS API server. This method connects to the NATS server, adds the service, groups, and endpoints,

Parameters:

  • nats_url (String) (defaults to: 'nats://localhost:4222')

    The URL of the NATS server.

  • service_opts (Hash) (defaults to: {})

    Options for the NATS service.



266
267
268
269
270
271
272
# File 'lib/leopard/nats_api_server.rb', line 266

def setup_worker(nats_url: 'nats://localhost:4222', service_opts: {})
  initialize_worker_state
  connect_client(nats_url)
  initialize_service(service_opts)
  add_endpoints(self.class.endpoints.dup, add_groups(self.class.groups.dup))
  start_jetstream_consumer(self.class.jetstream_endpoints.dup)
end

#setup_worker!(nats_url: 'nats://localhost:4222', service_opts: {}) ⇒ void

This method returns an undefined value.

Sets up a worker thread for the NATS API server and blocks the current thread.

Parameters:

  • nats_url (String) (defaults to: 'nats://localhost:4222')

    The URL of the NATS server.

  • service_opts (Hash) (defaults to: {})

    Options for the NATS service.

See Also:



281
282
283
284
# File 'lib/leopard/nats_api_server.rb', line 281

def setup_worker!(nats_url: 'nats://localhost:4222', service_opts: {})
  setup_worker(nats_url:, service_opts:)
  sleep
end

#start_jetstream_consumer(endpoints) ⇒ void (private)

This method returns an undefined value.

Starts the JetStream consumer coordinator when JetStream endpoints are present.

Parameters:



330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/leopard/nats_api_server.rb', line 330

def start_jetstream_consumer(endpoints)
  return if endpoints.empty?

  @jetstream_consumer = jetstream_consumer_class.new(
    jetstream: @client.jetstream,
    endpoints:,
    logger:,
    process_message: method(:process_transport_message),
    thread_factory:,
  )
  @jetstream_consumer.start
end

#stopvoid

This method returns an undefined value.

Stops the NATS API server worker.



289
290
291
292
293
294
295
296
# File 'lib/leopard/nats_api_server.rb', line 289

def stop
  @running = false
  stop_jetstream
  stop_service
  wake_worker
rescue ThreadError
  nil
end

#stop_jetstreamvoid (private)

This method returns an undefined value.

Stops the JetStream consumer coordinator if one was started.



346
347
348
# File 'lib/leopard/nats_api_server.rb', line 346

def stop_jetstream
  @jetstream_consumer&.stop
end

#stop_servicevoid (private)

This method returns an undefined value.

Stops the registered NATS service and closes the client connection.



353
354
355
356
# File 'lib/leopard/nats_api_server.rb', line 353

def stop_service
  @service&.stop
  @client&.close
end

#thread_factoryClass (private)

Returns the thread factory used for JetStream consumer loops.

Returns:

  • (Class)

    The thread factory class.



375
376
377
# File 'lib/leopard/nats_api_server.rb', line 375

def thread_factory
  Thread
end

#wake_workerThread? (private)

Wakes the worker thread if it is blocked.

Returns:

  • (Thread, nil)

    The awakened worker thread, if present.



361
362
363
# File 'lib/leopard/nats_api_server.rb', line 361

def wake_worker
  @thread&.wakeup
end