Class: Async::Service::Supervisor::Worker

Inherits:
Client
  • Object
show all
Defined in:
lib/async/service/supervisor/worker.rb

Overview

A worker represents a long running process that can be controlled by the supervisor.

There are various tasks that can be executed by the worker, such as dumping memory, threads, and garbage collection profiles.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(process_id: Process.pid, endpoint: Supervisor.endpoint, state: {}, utilization_schema: nil, utilization_registry: nil) ⇒ Worker

Initialize a new worker.



31
32
33
34
35
36
37
38
39
40
# File 'lib/async/service/supervisor/worker.rb', line 31

def initialize(process_id: Process.pid, endpoint: Supervisor.endpoint, state: {}, utilization_schema: nil, utilization_registry: nil)
	super(endpoint: endpoint)
	
	@id = nil
	@process_id = process_id
	@state = state
	
	@utilization_schema = utilization_schema
	@utilization_registry = utilization_registry || require("async/utilization") && Async::Utilization::Registry.new
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



43
44
45
# File 'lib/async/service/supervisor/worker.rb', line 43

def id
  @id
end

#process_idObject (readonly)

Returns the value of attribute process_id.



46
47
48
# File 'lib/async/service/supervisor/worker.rb', line 46

def process_id
  @process_id
end

#stateObject

Returns the value of attribute state.



49
50
51
# File 'lib/async/service/supervisor/worker.rb', line 49

def state
  @state
end

#The ID assigned by the supervisor.(IDassignedbythesupervisor.) ⇒ Object (readonly)



43
# File 'lib/async/service/supervisor/worker.rb', line 43

attr :id

#The process ID of the worker.(processIDoftheworker.) ⇒ Object (readonly)



46
# File 'lib/async/service/supervisor/worker.rb', line 46

attr :process_id

#The utilization registry for this worker.(utilizationregistry) ⇒ Object (readonly)



55
# File 'lib/async/service/supervisor/worker.rb', line 55

attr :utilization_registry

#utilization_registryObject (readonly)

Returns the value of attribute utilization_registry.



55
56
57
# File 'lib/async/service/supervisor/worker.rb', line 55

def utilization_registry
  @utilization_registry
end

#utilization_schemaObject (readonly)

Returns the value of attribute utilization_schema.



52
53
54
# File 'lib/async/service/supervisor/worker.rb', line 52

def utilization_schema
  @utilization_schema
end

Class Method Details

.run(process_id: Process.pid, endpoint: Supervisor.endpoint) ⇒ Object

Run a worker with the given process ID.



20
21
22
# File 'lib/async/service/supervisor/worker.rb', line 20

def self.run(process_id: Process.pid, endpoint: Supervisor.endpoint)
	self.new(process_id: process_id, endpoint: endpoint).run
end

Instance Method Details

#make_controllerObject

Create the worker controller for this worker.

Override this method to provide a custom worker controller that exposes additional RPCs to the supervisor.



84
85
86
# File 'lib/async/service/supervisor/worker.rb', line 84

def make_controller
	WorkerController.new(self)
end

#setup_utilization_observer(path, size, offset) ⇒ Object

Setup utilization observer for this worker.

Maps the shared memory file and configures the utilization registry to write metrics to it. Called by the supervisor (via WorkerController) to inform the worker of the shared memory file path and allocated offset.



68
69
70
71
72
73
74
75
76
77
# File 'lib/async/service/supervisor/worker.rb', line 68

def setup_utilization_observer(path, size, offset)
	return [] unless @utilization_schema
	
	schema = Async::Utilization::Schema.build(@utilization_schema)
	observer = Async::Utilization::Observer.open(schema, path, size, offset)
	@utilization_registry.observer = observer
	
	# Pass the schema back to the supervisor so it can be used to aggregate the metrics:
	observer.schema.to_a
end

#State associated with this worker (e.g., service name).=(associatedwiththisworker(e.g., service name)(value)) ⇒ Object



49
# File 'lib/async/service/supervisor/worker.rb', line 49

attr_accessor :state

#Utilization schema definition.=(schemadefinition. = (value)) ⇒ Object



52
# File 'lib/async/service/supervisor/worker.rb', line 52

attr :utilization_schema