Class: Async::Service::Supervisor::Worker
- Defined in:
- lib/async/service/supervisor/worker.rb
Overview
A worker represents a long running process that can be controlled by the supervisor.
There are various tasks that can be executed by the worker, such as dumping memory, threads, and garbage collection profiles.
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#process_id ⇒ Object
readonly
Returns the value of attribute process_id.
-
#state ⇒ Object
Returns the value of attribute state.
- #The ID assigned by the supervisor.(IDassignedbythesupervisor.) ⇒ Object readonly
- #The process ID of the worker.(processIDoftheworker.) ⇒ Object readonly
- #The utilization registry for this worker.(utilizationregistry) ⇒ Object readonly
-
#utilization_registry ⇒ Object
readonly
Returns the value of attribute utilization_registry.
-
#utilization_schema ⇒ Object
readonly
Returns the value of attribute utilization_schema.
Class Method Summary collapse
-
.run(process_id: Process.pid, endpoint: Supervisor.endpoint) ⇒ Object
Run a worker with the given process ID.
Instance Method Summary collapse
-
#initialize(process_id: Process.pid, endpoint: Supervisor.endpoint, state: {}, utilization_schema: nil, utilization_registry: nil) ⇒ Worker
constructor
Initialize a new worker.
-
#make_controller ⇒ Object
Create the worker controller for this worker.
-
#setup_utilization_observer(path, size, offset) ⇒ Object
Setup utilization observer for this worker.
- #State associated with this worker (e.g., service name).=(associatedwiththisworker(e.g., service name)(value)) ⇒ Object
- #Utilization schema definition.=(schemadefinition. = (value)) ⇒ Object
Constructor Details
#initialize(process_id: Process.pid, endpoint: Supervisor.endpoint, state: {}, utilization_schema: nil, utilization_registry: nil) ⇒ Worker
Initialize a new worker.
31 32 33 34 35 36 37 38 39 40 |
# File 'lib/async/service/supervisor/worker.rb', line 31 def initialize(process_id: Process.pid, endpoint: Supervisor.endpoint, state: {}, utilization_schema: nil, utilization_registry: nil) super(endpoint: endpoint) @id = nil @process_id = process_id @state = state @utilization_schema = utilization_schema @utilization_registry = utilization_registry || require("async/utilization") && Async::Utilization::Registry.new end |
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
43 44 45 |
# File 'lib/async/service/supervisor/worker.rb', line 43 def id @id end |
#process_id ⇒ Object (readonly)
Returns the value of attribute process_id.
46 47 48 |
# File 'lib/async/service/supervisor/worker.rb', line 46 def process_id @process_id end |
#state ⇒ Object
Returns the value of attribute state.
49 50 51 |
# File 'lib/async/service/supervisor/worker.rb', line 49 def state @state end |
#The ID assigned by the supervisor.(IDassignedbythesupervisor.) ⇒ Object (readonly)
43 |
# File 'lib/async/service/supervisor/worker.rb', line 43 attr :id |
#The process ID of the worker.(processIDoftheworker.) ⇒ Object (readonly)
46 |
# File 'lib/async/service/supervisor/worker.rb', line 46 attr :process_id |
#The utilization registry for this worker.(utilizationregistry) ⇒ Object (readonly)
55 |
# File 'lib/async/service/supervisor/worker.rb', line 55 attr :utilization_registry |
#utilization_registry ⇒ Object (readonly)
Returns the value of attribute utilization_registry.
55 56 57 |
# File 'lib/async/service/supervisor/worker.rb', line 55 def utilization_registry @utilization_registry end |
#utilization_schema ⇒ Object (readonly)
Returns the value of attribute utilization_schema.
52 53 54 |
# File 'lib/async/service/supervisor/worker.rb', line 52 def utilization_schema @utilization_schema end |
Class Method Details
.run(process_id: Process.pid, endpoint: Supervisor.endpoint) ⇒ Object
Run a worker with the given process ID.
20 21 22 |
# File 'lib/async/service/supervisor/worker.rb', line 20 def self.run(process_id: Process.pid, endpoint: Supervisor.endpoint) self.new(process_id: process_id, endpoint: endpoint).run end |
Instance Method Details
#make_controller ⇒ Object
Create the worker controller for this worker.
Override this method to provide a custom worker controller that exposes additional RPCs to the supervisor.
84 85 86 |
# File 'lib/async/service/supervisor/worker.rb', line 84 def make_controller WorkerController.new(self) end |
#setup_utilization_observer(path, size, offset) ⇒ Object
Setup utilization observer for this worker.
Maps the shared memory file and configures the utilization registry to write metrics to it. Called by the supervisor (via WorkerController) to inform the worker of the shared memory file path and allocated offset.
68 69 70 71 72 73 74 75 76 77 |
# File 'lib/async/service/supervisor/worker.rb', line 68 def setup_utilization_observer(path, size, offset) return [] unless @utilization_schema schema = Async::Utilization::Schema.build(@utilization_schema) observer = Async::Utilization::Observer.open(schema, path, size, offset) @utilization_registry.observer = observer # Pass the schema back to the supervisor so it can be used to aggregate the metrics: observer.schema.to_a end |
#State associated with this worker (e.g., service name).=(associatedwiththisworker(e.g., service name)(value)) ⇒ Object
49 |
# File 'lib/async/service/supervisor/worker.rb', line 49 attr_accessor :state |
#Utilization schema definition.=(schemadefinition. = (value)) ⇒ Object
52 |
# File 'lib/async/service/supervisor/worker.rb', line 52 attr :utilization_schema |