Class: Async::Service::Supervisor::SupervisorController

Inherits:
Bus::Controller
  • Object
show all
Defined in:
lib/async/service/supervisor/supervisor_controller.rb

Overview

Controller for supervisor operations.

Handles registration of workers, worker lookup, restarting process groups, and status queries.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, connection) ⇒ SupervisorController

Initialize the supervisor controller with the server and connection.



16
17
18
19
20
21
22
23
24
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 16

def initialize(server, connection)
	@server = server
	@connection = connection
	
	@id = nil
	@process_id = nil
	@worker = nil
	@state = {}
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



30
31
32
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 30

def connection
  @connection
end

#idObject (readonly)

Returns the value of attribute id.



33
34
35
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 33

def id
  @id
end

#process_idObject (readonly)

Returns the value of attribute process_id.



36
37
38
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 36

def process_id
  @process_id
end

#serverObject (readonly)

Returns the value of attribute server.



27
28
29
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 27

def server
  @server
end

#stateObject

Returns the value of attribute state.



42
43
44
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 42

def state
  @state
end

#The ID assigned to this worker.(IDassignedtothisworker.) ⇒ Object (readonly)



33
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 33

attr :id

#The process ID of the worker.(processIDoftheworker.) ⇒ Object (readonly)



36
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 36

attr :process_id

#The proxy to the worker controller.(proxytotheworkercontroller.) ⇒ Object (readonly)



39
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 39

attr :worker

#The server instance.(serverinstance.) ⇒ Object (readonly)



27
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 27

attr :server

#workerObject (readonly)

Returns the value of attribute worker.



39
40
41
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 39

def worker
  @worker
end

Instance Method Details

#[](id) ⇒ Object

Get a worker controller proxy by connection ID.

Returns a proxy to the worker controller that can be used to invoke operations directly on the worker. The proxy uses multi-hop forwarding to route calls through the supervisor to the worker.

Examples:

Accessing a worker

supervisor = connection[:supervisor]
worker = supervisor[id]
worker.memory_dump(path: "/tmp/dump.json")


80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 80

def [](id)
	unless id
		raise ArgumentError, "Missing 'id' parameter"
	end
	
	supervisor_controller = @server.controllers[id]
	
	unless supervisor_controller
		raise ArgumentError, "Connection not found: #{id}"
	end
	
	worker = supervisor_controller.worker
	
	unless worker
		raise ArgumentError, "Worker controller not found for connection: #{id}"
	end
	
	return worker
end

#keysObject

List all registered worker IDs.



103
104
105
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 103

def keys
	@server.controllers.keys
end

#register(worker, process_id:, state: {}) ⇒ Object

Register a worker connection with the supervisor.

Allocates a unique sequential ID, stores the worker controller proxy, and notifies all monitors of the new connection.

Raises:

  • (RuntimeError)


53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 53

def register(worker, process_id:, state: {})
	raise RuntimeError, "Already registered" if @id
	
	@id = @server.next_id
	@process_id = process_id
	@worker = worker
	@state.merge!(state)
	
	@server.add(self)
	
	return @id
end

#restart(signal: :INT) ⇒ Object

Restart the current process group, usually including the supervisor and any other processes.



110
111
112
113
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 110

def restart(signal: :INT)
	# We are going to terminate the process group, including *this* process
	::Process.kill(signal, ::Process.ppid)
end

#State associated with this worker connection (e.g., service name).=(associatedwiththisworkerconnection(e.g., service name)(value)) ⇒ Object



42
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 42

attr_accessor :state

#statusObject

Query the status of the supervisor and all connected workers.

Returns an array of status information from each monitor. Each monitor provides its own status representation.



121
122
123
124
125
126
127
128
129
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 121

def status
	@server.monitors.map do |monitor|
		begin
			monitor.status
		rescue => error
			error
		end
	end.compact
end

#The connection instance.=(connectioninstance. = (value)) ⇒ Object



30
# File 'lib/async/service/supervisor/supervisor_controller.rb', line 30

attr :connection