Class: ProcessBot::Process
- Inherits:
-
Object
- Object
- ProcessBot::Process
show all
- Extended by:
- Forwardable
- Defined in:
- lib/process_bot/process.rb
Overview
rubocop:disable Metrics/ClassLength
Defined Under Namespace
Classes: Handlers, Runner, RunnerInstance
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(options) ⇒ Process
Returns a new instance of Process.
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
# File 'lib/process_bot/process.rb', line 18
def initialize(options)
@options = options
@stopped = false
@accept_control_commands = true
@control_command_monitor = Monitor.new
@control_commands_in_flight = 0
@runner_events = Queue.new
@runner_instances = []
@runner_monitor = Monitor.new
options.events.connect(:on_process_started, &method(:on_process_started)) options.events.connect(:on_socket_opened, &method(:on_socket_opened))
logger.logs("ProcessBot 1 - Options: #{options.options}")
end
|
Instance Attribute Details
#control_command_monitor ⇒ Object
Returns the value of attribute control_command_monitor.
16
17
18
|
# File 'lib/process_bot/process.rb', line 16
def control_command_monitor
@control_command_monitor
end
|
#current_pid ⇒ Object
Returns the value of attribute current_pid.
16
17
18
|
# File 'lib/process_bot/process.rb', line 16
def current_pid
@current_pid
end
|
#current_process_title ⇒ Object
Returns the value of attribute current_process_title.
16
17
18
|
# File 'lib/process_bot/process.rb', line 16
def current_process_title
@current_process_title
end
|
#options ⇒ Object
Returns the value of attribute options.
16
17
18
|
# File 'lib/process_bot/process.rb', line 16
def options
@options
end
|
#port ⇒ Object
Returns the value of attribute port.
16
17
18
|
# File 'lib/process_bot/process.rb', line 16
def port
@port
end
|
#stopped ⇒ Object
Returns the value of attribute stopped.
16
17
18
|
# File 'lib/process_bot/process.rb', line 16
def stopped
@stopped
end
|
Instance Method Details
#accept_control_commands? ⇒ Boolean
203
204
205
|
# File 'lib/process_bot/process.rb', line 203
def accept_control_commands?
@accept_control_commands
end
|
#active_runner ⇒ Object
154
155
156
|
# File 'lib/process_bot/process.rb', line 154
def active_runner
current_runner_instance&.runner
end
|
#active_runner! ⇒ Object
158
159
160
161
162
163
|
# File 'lib/process_bot/process.rb', line 158
def active_runner!
runner_instance = active_runner
return runner_instance if runner_instance
raise "Unable to stop custom process because no active runner is available. Ensure the custom command runs in foreground."
end
|
#build_runner ⇒ Object
222
223
224
225
226
227
228
229
230
|
# File 'lib/process_bot/process.rb', line 222
def build_runner
ProcessBot::Process::Runner.new(
command: handler_instance.start_command,
handler_name: handler_name,
handler_instance: handler_instance,
logger: logger,
options: options
)
end
|
#clear_current_runner(runner_instance) ⇒ Object
292
293
294
295
296
297
|
# File 'lib/process_bot/process.rb', line 292
def clear_current_runner(runner_instance)
return unless runner_instance == current_runner_instance
@current_runner_instance = nil
@runner = nil
end
|
#client ⇒ Object
47
48
49
|
# File 'lib/process_bot/process.rb', line 47
def client
@client ||= ProcessBot::ClientSocket.new(options: options)
end
|
#control_commands_in_flight ⇒ Object
216
217
218
219
220
|
# File 'lib/process_bot/process.rb', line 216
def control_commands_in_flight
control_command_monitor.synchronize do
@control_commands_in_flight
end
end
|
#execute! ⇒ Object
34
35
36
37
38
39
40
41
42
43
44
45
|
# File 'lib/process_bot/process.rb', line 34
def execute!
command = options.fetch(:command)
if command == "start"
logger.logs "Starting process"
start
elsif command == "graceful" || command == "graceful_no_wait" || command == "restart" || command == "stop"
send_control_command(command)
else
raise "Unknown command: #{command}"
end
end
|
342
343
344
345
346
347
348
349
350
351
352
353
354
|
# File 'lib/process_bot/process.rb', line 342
def force_stop_process_bot_if_configured(matching_processes)
return unless truthy_option?(:force_stop_on_no_response)
matching_processes.each do |line|
pid = line.strip.split(/\s+/, 2).first
next unless pid&.match?(/\A\d+\z/)
logger.logs "Force-stopping unresponsive process_bot PID #{pid}"
Process.kill("TERM", Integer(pid, 10))
rescue Errno::ESRCH
logger.logs "Process bot PID #{pid} already gone during force stop"
end
end
|
#handle_missing_control_command_response(command) ⇒ Object
308
309
310
311
312
313
314
|
# File 'lib/process_bot/process.rb', line 308
def handle_missing_control_command_response(command)
return unless command == "stop"
matching_processes = matching_process_bot_processes
log_missing_control_response_diagnostics(matching_processes)
force_stop_process_bot_if_configured(matching_processes)
end
|
#handle_runner_event(runner_event) ⇒ Object
245
246
247
248
249
250
251
|
# File 'lib/process_bot/process.rb', line 245
def handle_runner_event(runner_event)
runner_instance = runner_event.fetch(:runner_instance)
remove_runner_instance(runner_instance)
log_runner_event_error(runner_event)
clear_current_runner(runner_instance)
restart_runner_if_needed(runner_instance)
end
|
#handler_class ⇒ Object
51
52
53
54
55
56
|
# File 'lib/process_bot/process.rb', line 51
def handler_class
@handler_class ||= begin
require_relative "process/handlers/#{handler_name}"
ProcessBot::Process::Handlers.const_get(StringCases.snake_to_camel(handler_name))
end
end
|
#handler_instance ⇒ Object
58
59
60
|
# File 'lib/process_bot/process.rb', line 58
def handler_instance
@handler_instance ||= handler_class.new(self)
end
|
#handler_name ⇒ Object
62
63
64
|
# File 'lib/process_bot/process.rb', line 62
def handler_name
@handler_name ||= options.fetch(:handler)
end
|
#log_missing_control_response_diagnostics(matching_processes) ⇒ Object
316
317
318
319
|
# File 'lib/process_bot/process.rb', line 316
def log_missing_control_response_diagnostics(matching_processes)
logger.logs "Control command response missing; attempting diagnostics for application=#{options[:application].inspect} id=#{options[:id].inspect}"
logger.logs "Matching process_bot lines:\n#{matching_process_bot_processes_text(matching_processes)}"
end
|
#log_runner_event_error(runner_event) ⇒ Object
286
287
288
289
290
|
# File 'lib/process_bot/process.rb', line 286
def log_runner_event_error(runner_event)
return unless runner_event.fetch(:type) == :error
logger.error "Process runner crashed: #{runner_event.fetch(:error)}"
end
|
#logger ⇒ Object
66
67
68
|
# File 'lib/process_bot/process.rb', line 66
def logger
@logger ||= ProcessBot::Logger.new(options: options)
end
|
#matching_process_bot_processes ⇒ Object
327
328
329
330
331
332
333
334
|
# File 'lib/process_bot/process.rb', line 327
def matching_process_bot_processes
ps_output = Knj::Os.shellcmd("ps -eo pid,args")
ps_output
.to_s
.split("\n")
.select { |line| process_bot_process_line_matches?(line) }
end
|
#matching_process_bot_processes_text(lines) ⇒ Object
321
322
323
324
325
|
# File 'lib/process_bot/process.rb', line 321
def matching_process_bot_processes_text(lines)
return "(none)" if lines.empty?
lines.join("\n")
end
|
#on_process_started(_event_name, pid:) ⇒ Object
70
71
72
73
|
# File 'lib/process_bot/process.rb', line 70
def on_process_started(_event_name, pid:)
@current_pid = pid
update_process_title
end
|
#on_socket_opened(_event_name, port:) ⇒ Object
75
76
77
78
|
# File 'lib/process_bot/process.rb', line 75
def on_socket_opened(_event_name, port:)
@port = port
update_process_title
end
|
#process_bot_process_line_matches?(line) ⇒ Boolean
336
337
338
339
340
|
# File 'lib/process_bot/process.rb', line 336
def process_bot_process_line_matches?(line)
line.include?("ProcessBot {") &&
line.include?("\"application\":\"#{options[:application]}\"") &&
line.include?("\"id\":\"#{options[:id]}\"")
end
|
#remove_runner_instance(runner_instance) ⇒ Object
265
266
267
268
269
|
# File 'lib/process_bot/process.rb', line 265
def remove_runner_instance(runner_instance)
runner_monitor.synchronize do
@runner_instances.delete(runner_instance)
end
end
|
#restart(**args) ⇒ Object
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
# File 'lib/process_bot/process.rb', line 105
def restart(**args)
logger.logs "Restart process"
if handler_name == "sidekiq"
if restart_overlap?(args)
handler_instance.graceful_no_wait(stop_process_bot: false)
start_runner_instance
else
handler_instance.graceful(stop_process_bot: false)
end
else
handler_instance.stop
end
end
|
#restart_overlap?(command_options = {}) ⇒ Boolean
271
272
273
274
275
276
277
278
279
280
281
282
283
284
|
# File 'lib/process_bot/process.rb', line 271
def restart_overlap?(command_options = {})
value = if command_options.key?(:sidekiq_restart_overlap)
command_options[:sidekiq_restart_overlap]
else
options[:sidekiq_restart_overlap]
end
return false if value.nil?
return value if value == true || value == false
normalized = value.to_s.strip.downcase
return false if normalized == "false" || normalized == "0" || normalized == ""
true
end
|
#restart_runner_if_needed(runner_instance) ⇒ Object
299
300
301
302
303
304
305
306
|
# File 'lib/process_bot/process.rb', line 299
def restart_runner_if_needed(runner_instance)
return if stopped
return unless runner_instance == current_runner_instance || current_runner_instance.nil?
logger.logs "Process stopped - starting again after 1 sec"
sleep 1
start_runner_instance
end
|
#run ⇒ Object
131
132
133
|
# File 'lib/process_bot/process.rb', line 131
def run
start_runner_instance
end
|
#runner ⇒ Object
150
151
152
|
# File 'lib/process_bot/process.rb', line 150
def runner
current_runner_instance&.runner || @runner ||= build_runner
end
|
#runner_instances ⇒ Object
253
254
255
256
257
|
# File 'lib/process_bot/process.rb', line 253
def runner_instances
runner_monitor.synchronize do
@runner_instances.dup
end
end
|
#safe_application_basename ⇒ Object
Capistrano-style release paths (‘…/<app>/releases/<timestamp>`) resolve to a stable per-app basename like `awesome-tasks` that is consistent across deploys of the same app and distinct across apps sharing a host. `ControlSocket#ensure_no_duplicate_id!` uses it to scope the duplicate-id guard by `(application_basename, id)` so an unrelated app on the same host can safely reuse an id like `sidekiq-main`. Returns nil when release_path isn’t set.
185
186
187
188
189
|
# File 'lib/process_bot/process.rb', line 185
def safe_application_basename
options.application_basename
rescue KeyError
nil
end
|
#send_control_command(command, **command_options) ⇒ Object
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
# File 'lib/process_bot/process.rb', line 135
def send_control_command(command, **command_options)
logger.logs "Sending #{command} command"
response = client.send_command(command: command, options: options.options.merge(command_options))
if response == :nil
handle_missing_control_command_response(command)
return if options[:ignore_no_process_bot]
raise "No response from ProcessBot while sending #{command}"
end
rescue Errno::ECONNREFUSED => e
handle_missing_control_command_response(command)
raise e unless options[:ignore_no_process_bot]
end
|
#set_stopped ⇒ Object
80
81
82
|
# File 'lib/process_bot/process.rb', line 80
def set_stopped
@stopped = true
end
|
#start ⇒ Object
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
# File 'lib/process_bot/process.rb', line 89
def start
start_control_socket
start_runner_instance
loop do
runner_event = runner_events.pop
handle_runner_event(runner_event)
next unless stopped && runner_instances.empty?
stop_accepting_control_commands
wait_for_control_commands
break
end
end
|
#start_control_socket ⇒ Object
84
85
86
87
|
# File 'lib/process_bot/process.rb', line 84
def start_control_socket
@control_socket = ProcessBot::ControlSocket.new(options: options, process: self)
@control_socket.start
end
|
#start_runner_instance ⇒ Object
232
233
234
235
236
237
238
239
240
241
242
243
|
# File 'lib/process_bot/process.rb', line 232
def start_runner_instance
runner_instance = ProcessBot::Process::RunnerInstance.new(
runner: build_runner,
event_queue: runner_events,
logger: logger
)
track_runner_instance(runner_instance)
@current_runner_instance = runner_instance
@runner = runner_instance.runner
runner_instance.start
end
|
#stop(**args) ⇒ Object
120
121
122
123
124
125
126
127
128
129
|
# File 'lib/process_bot/process.rb', line 120
def stop(**args)
logger.logs "Stop process #{args}"
@stopped = true
handler_instance.stop
if runner_instances.empty?
logger.logs "No runner instances remaining, signaling main loop to exit"
runner_events << {type: :stopped, runner_instance: nil}
end
end
|
#stop_accepting_control_commands ⇒ Object
207
208
209
210
|
# File 'lib/process_bot/process.rb', line 207
def stop_accepting_control_commands
@accept_control_commands = false
@control_socket&.stop
end
|
#track_runner_instance(runner_instance) ⇒ Object
259
260
261
262
263
|
# File 'lib/process_bot/process.rb', line 259
def track_runner_instance(runner_instance)
runner_monitor.synchronize do
@runner_instances << runner_instance
end
end
|
#truthy_option?(key) ⇒ Boolean
356
357
358
359
360
361
362
363
364
365
|
# File 'lib/process_bot/process.rb', line 356
def truthy_option?(key)
value = options[key]
return false if value.nil?
return value if value == true || value == false
normalized = value.to_s.strip.downcase
return false if normalized == "false" || normalized == "0" || normalized == ""
true
end
|
#update_process_title ⇒ Object
165
166
167
168
169
170
171
172
173
174
175
176
|
# File 'lib/process_bot/process.rb', line 165
def update_process_title
process_args = {
application: options[:application],
application_basename: safe_application_basename,
handler: handler_name,
id: options[:id],
pid: current_pid,
port: port
}
@current_process_title = "ProcessBot #{JSON.generate(process_args)}"
Process.setproctitle(current_process_title)
end
|
#wait_for_control_commands ⇒ Object
212
213
214
|
# File 'lib/process_bot/process.rb', line 212
def wait_for_control_commands
sleep 0.1 while control_commands_in_flight.positive?
end
|
#with_control_command ⇒ Object
191
192
193
194
195
196
197
198
199
200
201
|
# File 'lib/process_bot/process.rb', line 191
def with_control_command
control_command_monitor.synchronize do
@control_commands_in_flight += 1
end
yield
ensure
control_command_monitor.synchronize do
@control_commands_in_flight -= 1
end
end
|