Class: Async::Service::Supervisor::Server

Inherits:
Bus::Server
  • Object
show all
Defined in:
lib/async/service/supervisor/server.rb

Overview

The server represents the main supervisor process which is responsible for managing the lifecycle of other processes.

There are various tasks that can be executed by the server, such as restarting the process group, and querying the status of the processes. The server is also responsible for managing the lifecycle of the monitors, which can be used to monitor the status of the connected workers.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(monitors: [], endpoint: Supervisor.endpoint, **options) ⇒ Server

Initialize a new supervisor server.



21
22
23
24
25
26
27
# File 'lib/async/service/supervisor/server.rb', line 21

def initialize(monitors: [], endpoint: Supervisor.endpoint, **options)
	super(endpoint, **options)
	
	@monitors = monitors
	@controllers = {}
	@next_id = 0
end

Instance Attribute Details

#controllersObject (readonly)

Returns the value of attribute controllers.



30
31
32
# File 'lib/async/service/supervisor/server.rb', line 30

def controllers
  @controllers
end

#monitorsObject (readonly)

Returns the value of attribute monitors.



29
30
31
# File 'lib/async/service/supervisor/server.rb', line 29

def monitors
  @monitors
end

Instance Method Details

#add(controller) ⇒ Object

Add a controller to the server.

Validates that the controller has been properly registered with an ID and checks for ID collisions before adding it to tracking.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/async/service/supervisor/server.rb', line 46

def add(controller)
	unless id = controller.id
		raise RuntimeError, "Controller must be registered with an ID before being added!"
	end
	
	if @controllers.key?(id)
		raise RuntimeError, "Controller already registered: id=#{id}"
	end
	
	@controllers[id] = controller
	
	# Notify monitors with the supervisor controller:
	@monitors.each do |monitor|
		monitor.register(controller)
	rescue => error
		Console.error(self, "Error while registering process!", monitor: monitor, exception: error)
	end
end

#next_idObject

Allocate the next unique sequential ID.



35
36
37
# File 'lib/async/service/supervisor/server.rb', line 35

def next_id
	@next_id += 1
end

#remove(controller) ⇒ Object

Remove a worker connection from the supervisor.

Notifies all monitors and removes the connection from tracking.



70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/async/service/supervisor/server.rb', line 70

def remove(controller)
	if id = controller.id
		@controllers.delete(id)
	end
	
	# Notify monitors with the supervisor controller:
	@monitors.each do |monitor|
		monitor.remove(controller)
	rescue => error
		Console.error(self, "Error while removing process!", monitor: monitor, exception: error)
	end
end

#runObject

Run the supervisor server.

Starts all monitors and accepts connections from workers.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/async/service/supervisor/server.rb', line 88

def run
	Sync do |task|
		barrier = Async::Barrier.new
		
		# Start all monitors:
		@monitors.each do |monitor|
			monitor.run(parent: barrier)
		rescue => error
			Console.error(self, "Error while starting monitor!", monitor: monitor, exception: error)
		end
		
		barrier.async do
			# Accept connections from workers:
			self.accept do |connection|
				# Create a supervisor controller for this connection:
				supervisor_controller = SupervisorController.new(self, connection)
				
				# Bind supervisor controller:
				connection.bind(:supervisor, supervisor_controller)
				
				# Run the connection:
				connection.run
			ensure
				self.remove(supervisor_controller)
			end
		end
		
		barrier.wait
	ensure
		barrier&.stop
	end
end