Class: Async::Service::Supervisor::Server
- Inherits:
-
Bus::Server
- Object
- Bus::Server
- Async::Service::Supervisor::Server
- Defined in:
- lib/async/service/supervisor/server.rb
Overview
The server represents the main supervisor process which is responsible for managing the lifecycle of other processes.
There are various tasks that can be executed by the server, such as restarting the process group, and querying the status of the processes. The server is also responsible for managing the lifecycle of the monitors, which can be used to monitor the status of the connected workers.
Instance Attribute Summary collapse
-
#controllers ⇒ Object
readonly
Returns the value of attribute controllers.
-
#monitors ⇒ Object
readonly
Returns the value of attribute monitors.
Instance Method Summary collapse
-
#add(controller) ⇒ Object
Add a controller to the server.
-
#initialize(monitors: [], endpoint: Supervisor.endpoint, **options) ⇒ Server
constructor
Initialize a new supervisor server.
-
#next_id ⇒ Object
Allocate the next unique sequential ID.
-
#remove(controller) ⇒ Object
Remove a worker connection from the supervisor.
-
#run ⇒ Object
Run the supervisor server.
Constructor Details
#initialize(monitors: [], endpoint: Supervisor.endpoint, **options) ⇒ Server
Initialize a new supervisor server.
21 22 23 24 25 26 27 |
# File 'lib/async/service/supervisor/server.rb', line 21 def initialize(monitors: [], endpoint: Supervisor.endpoint, **) super(endpoint, **) @monitors = monitors @controllers = {} @next_id = 0 end |
Instance Attribute Details
#controllers ⇒ Object (readonly)
Returns the value of attribute controllers.
30 31 32 |
# File 'lib/async/service/supervisor/server.rb', line 30 def controllers @controllers end |
#monitors ⇒ Object (readonly)
Returns the value of attribute monitors.
29 30 31 |
# File 'lib/async/service/supervisor/server.rb', line 29 def monitors @monitors end |
Instance Method Details
#add(controller) ⇒ Object
Add a controller to the server.
Validates that the controller has been properly registered with an ID and checks for ID collisions before adding it to tracking.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/async/service/supervisor/server.rb', line 46 def add(controller) unless id = controller.id raise RuntimeError, "Controller must be registered with an ID before being added!" end if @controllers.key?(id) raise RuntimeError, "Controller already registered: id=#{id}" end @controllers[id] = controller # Notify monitors with the supervisor controller: @monitors.each do |monitor| monitor.register(controller) rescue => error Console.error(self, "Error while registering process!", monitor: monitor, exception: error) end end |
#next_id ⇒ Object
Allocate the next unique sequential ID.
35 36 37 |
# File 'lib/async/service/supervisor/server.rb', line 35 def next_id @next_id += 1 end |
#remove(controller) ⇒ Object
Remove a worker connection from the supervisor.
Notifies all monitors and removes the connection from tracking.
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/async/service/supervisor/server.rb', line 70 def remove(controller) if id = controller.id @controllers.delete(id) end # Notify monitors with the supervisor controller: @monitors.each do |monitor| monitor.remove(controller) rescue => error Console.error(self, "Error while removing process!", monitor: monitor, exception: error) end end |
#run ⇒ Object
Run the supervisor server.
Starts all monitors and accepts connections from workers.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/async/service/supervisor/server.rb', line 88 def run Sync do |task| = Async::Barrier.new # Start all monitors: @monitors.each do |monitor| monitor.run(parent: ) rescue => error Console.error(self, "Error while starting monitor!", monitor: monitor, exception: error) end .async do # Accept connections from workers: self.accept do |connection| # Create a supervisor controller for this connection: supervisor_controller = SupervisorController.new(self, connection) # Bind supervisor controller: connection.bind(:supervisor, supervisor_controller) # Run the connection: connection.run ensure self.remove(supervisor_controller) end end .wait ensure &.stop end end |