Class: Falcon::Service::Supervisor

Inherits:
Async::Service::Generic
  • Object
show all
Defined in:
lib/falcon/service/supervisor.rb

Overview

Implements a host supervisor which can restart the host services and provide various metrics about the running processes.

Instance Method Summary collapse

Constructor Details

#initializeSupervisor

Initialize the supervisor using the given environment.



19
20
21
22
23
# File 'lib/falcon/service/supervisor.rb', line 19

def initialize(...)
	super
	
	@bound_endpoint = nil
end

Instance Method Details

#do_metrics(message) ⇒ Object

Capture process metrics relating to the process group that the supervisor belongs to.



44
45
46
# File 'lib/falcon/service/supervisor.rb', line 44

def do_metrics(message)
	Process::Metrics::General.capture(pid: Process.ppid, ppid: Process.ppid)
end

#do_restart(message) ⇒ Object

Restart the process group that the supervisor belongs to.



32
33
34
35
36
37
38
39
40
41
# File 'lib/falcon/service/supervisor.rb', line 32

def do_restart(message)
	# Tell the parent of this process group to spin up a new process group/container.
	# Wait for that to start accepting new connections.
	# Stop accepting connections.
	# Wait for existing connnections to drain.
	# Terminate this process group.
	signal = message[:signal] || :INT
	
	Process.kill(signal, Process.ppid)
end

#endpointObject

The endpoint which the supervisor will bind to. Typically a unix pipe in the same directory as the host.



27
28
29
# File 'lib/falcon/service/supervisor.rb', line 27

def endpoint
	@evaluator.endpoint
end

#handle(message) ⇒ Object

Handle an incoming request.



50
51
52
53
54
55
56
57
# File 'lib/falcon/service/supervisor.rb', line 50

def handle(message)
	case message[:please]
	when 'restart'
		self.do_restart(message)
	when 'metrics'
		self.do_metrics(message)
	end
end

#setup(container) ⇒ Object

Start the supervisor process which accepts connections from the bound endpoint and processes JSON formatted messages.



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/falcon/service/supervisor.rb', line 72

def setup(container)
	container.run(name: self.name, restart: true, count: 1) do |instance|
		Async do
			@bound_endpoint.accept do |peer|
				stream = Async::IO::Stream.new(peer)
				
				while message = stream.gets("\0")
					response = handle(JSON.parse(message, symbolize_names: true))
					stream.puts(response.to_json, separator: "\0")
				end
			end
			
			instance.ready!
		end
	end
	
	super
end

#startObject

Bind the supervisor to the specified endpoint.



60
61
62
63
64
65
66
67
68
# File 'lib/falcon/service/supervisor.rb', line 60

def start
	Console.logger.info(self) {"Binding to #{self.endpoint}..."}
	
	@bound_endpoint = Async::Reactor.run do
		Async::IO::SharedEndpoint.bound(self.endpoint)
	end.wait
	
	super
end

#stopObject

Release the bound endpoint.



92
93
94
95
96
97
# File 'lib/falcon/service/supervisor.rb', line 92

def stop
	@bound_endpoint&.close
	@bound_endpoint = nil
	
	super
end