Class: ProcessBot::Process

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/process_bot/process.rb

Overview

rubocop:disable Metrics/ClassLength

Defined Under Namespace

Classes: Handlers, Runner, RunnerInstance

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Process

Returns a new instance of Process.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/process_bot/process.rb', line 18

def initialize(options)
  @options = options
  @stopped = false
  @accept_control_commands = true
  @control_command_monitor = Monitor.new
  @control_commands_in_flight = 0
  @runner_events = Queue.new
  @runner_instances = []
  @runner_monitor = Monitor.new

  options.events.connect(:on_process_started, &method(:on_process_started)) # rubocop:disable Performance/MethodObjectAsBlock
  options.events.connect(:on_socket_opened, &method(:on_socket_opened)) # rubocop:disable Performance/MethodObjectAsBlock

  logger.logs("ProcessBot 1 - Options: #{options.options}")
end

Instance Attribute Details

#control_command_monitorObject (readonly)

Returns the value of attribute control_command_monitor.



16
17
18
# File 'lib/process_bot/process.rb', line 16

def control_command_monitor
  @control_command_monitor
end

#current_pidObject (readonly)

Returns the value of attribute current_pid.



16
17
18
# File 'lib/process_bot/process.rb', line 16

def current_pid
  @current_pid
end

#current_process_titleObject (readonly)

Returns the value of attribute current_process_title.



16
17
18
# File 'lib/process_bot/process.rb', line 16

def current_process_title
  @current_process_title
end

#optionsObject (readonly)

Returns the value of attribute options.



16
17
18
# File 'lib/process_bot/process.rb', line 16

def options
  @options
end

#portObject (readonly)

Returns the value of attribute port.



16
17
18
# File 'lib/process_bot/process.rb', line 16

def port
  @port
end

#stoppedObject (readonly)

Returns the value of attribute stopped.



16
17
18
# File 'lib/process_bot/process.rb', line 16

def stopped
  @stopped
end

Instance Method Details

#accept_control_commands?Boolean

Returns:

  • (Boolean)


203
204
205
# File 'lib/process_bot/process.rb', line 203

def accept_control_commands?
  @accept_control_commands
end

#active_runnerObject



154
155
156
# File 'lib/process_bot/process.rb', line 154

def active_runner
  current_runner_instance&.runner
end

#active_runner!Object



158
159
160
161
162
163
# File 'lib/process_bot/process.rb', line 158

def active_runner!
  runner_instance = active_runner
  return runner_instance if runner_instance

  raise "Unable to stop custom process because no active runner is available. Ensure the custom command runs in foreground."
end

#build_runnerObject



222
223
224
225
226
227
228
229
230
# File 'lib/process_bot/process.rb', line 222

def build_runner
  ProcessBot::Process::Runner.new(
    command: handler_instance.start_command,
    handler_name: handler_name,
    handler_instance: handler_instance,
    logger: logger,
    options: options
  )
end

#clear_current_runner(runner_instance) ⇒ Object



292
293
294
295
296
297
# File 'lib/process_bot/process.rb', line 292

def clear_current_runner(runner_instance)
  return unless runner_instance == current_runner_instance

  @current_runner_instance = nil
  @runner = nil
end

#clientObject



47
48
49
# File 'lib/process_bot/process.rb', line 47

def client
  @client ||= ProcessBot::ClientSocket.new(options: options)
end

#control_commands_in_flightObject



216
217
218
219
220
# File 'lib/process_bot/process.rb', line 216

def control_commands_in_flight
  control_command_monitor.synchronize do
    @control_commands_in_flight
  end
end

#execute!Object



34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/process_bot/process.rb', line 34

def execute!
  command = options.fetch(:command)

  if command == "start"
    logger.logs "Starting process"
    start
  elsif command == "graceful" || command == "graceful_no_wait" || command == "restart" || command == "stop"
    send_control_command(command)
  else
    raise "Unknown command: #{command}"
  end
end

#force_stop_process_bot_if_configured(matching_processes) ⇒ Object



342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/process_bot/process.rb', line 342

def force_stop_process_bot_if_configured(matching_processes)
  return unless truthy_option?(:force_stop_on_no_response)

  matching_processes.each do |line|
    pid = line.strip.split(/\s+/, 2).first
    next unless pid&.match?(/\A\d+\z/)

    logger.logs "Force-stopping unresponsive process_bot PID #{pid}"
    Process.kill("TERM", Integer(pid, 10))
  rescue Errno::ESRCH
    logger.logs "Process bot PID #{pid} already gone during force stop"
  end
end

#handle_missing_control_command_response(command) ⇒ Object



308
309
310
311
312
313
314
# File 'lib/process_bot/process.rb', line 308

def handle_missing_control_command_response(command)
  return unless command == "stop"

  matching_processes = matching_process_bot_processes
  log_missing_control_response_diagnostics(matching_processes)
  force_stop_process_bot_if_configured(matching_processes)
end

#handle_runner_event(runner_event) ⇒ Object



245
246
247
248
249
250
251
# File 'lib/process_bot/process.rb', line 245

def handle_runner_event(runner_event)
  runner_instance = runner_event.fetch(:runner_instance)
  remove_runner_instance(runner_instance)
  log_runner_event_error(runner_event)
  clear_current_runner(runner_instance)
  restart_runner_if_needed(runner_instance)
end

#handler_classObject



51
52
53
54
55
56
# File 'lib/process_bot/process.rb', line 51

def handler_class
  @handler_class ||= begin
    require_relative "process/handlers/#{handler_name}"
    ProcessBot::Process::Handlers.const_get(StringCases.snake_to_camel(handler_name))
  end
end

#handler_instanceObject



58
59
60
# File 'lib/process_bot/process.rb', line 58

def handler_instance
  @handler_instance ||= handler_class.new(self)
end

#handler_nameObject



62
63
64
# File 'lib/process_bot/process.rb', line 62

def handler_name
  @handler_name ||= options.fetch(:handler)
end

#log_missing_control_response_diagnostics(matching_processes) ⇒ Object



316
317
318
319
# File 'lib/process_bot/process.rb', line 316

def log_missing_control_response_diagnostics(matching_processes)
  logger.logs "Control command response missing; attempting diagnostics for application=#{options[:application].inspect} id=#{options[:id].inspect}"
  logger.logs "Matching process_bot lines:\n#{matching_process_bot_processes_text(matching_processes)}"
end

#log_runner_event_error(runner_event) ⇒ Object



286
287
288
289
290
# File 'lib/process_bot/process.rb', line 286

def log_runner_event_error(runner_event)
  return unless runner_event.fetch(:type) == :error

  logger.error "Process runner crashed: #{runner_event.fetch(:error)}"
end

#loggerObject



66
67
68
# File 'lib/process_bot/process.rb', line 66

def logger
  @logger ||= ProcessBot::Logger.new(options: options)
end

#matching_process_bot_processesObject



327
328
329
330
331
332
333
334
# File 'lib/process_bot/process.rb', line 327

def matching_process_bot_processes
  ps_output = Knj::Os.shellcmd("ps -eo pid,args")

  ps_output
    .to_s
    .split("\n")
    .select { |line| process_bot_process_line_matches?(line) }
end

#matching_process_bot_processes_text(lines) ⇒ Object



321
322
323
324
325
# File 'lib/process_bot/process.rb', line 321

def matching_process_bot_processes_text(lines)
  return "(none)" if lines.empty?

  lines.join("\n")
end

#on_process_started(_event_name, pid:) ⇒ Object



70
71
72
73
# File 'lib/process_bot/process.rb', line 70

def on_process_started(_event_name, pid:)
  @current_pid = pid
  update_process_title
end

#on_socket_opened(_event_name, port:) ⇒ Object



75
76
77
78
# File 'lib/process_bot/process.rb', line 75

def on_socket_opened(_event_name, port:)
  @port = port
  update_process_title
end

#process_bot_process_line_matches?(line) ⇒ Boolean

Returns:

  • (Boolean)


336
337
338
339
340
# File 'lib/process_bot/process.rb', line 336

def process_bot_process_line_matches?(line)
  line.include?("ProcessBot {") &&
    line.include?("\"application\":\"#{options[:application]}\"") &&
    line.include?("\"id\":\"#{options[:id]}\"")
end

#remove_runner_instance(runner_instance) ⇒ Object



265
266
267
268
269
# File 'lib/process_bot/process.rb', line 265

def remove_runner_instance(runner_instance)
  runner_monitor.synchronize do
    @runner_instances.delete(runner_instance)
  end
end

#restart(**args) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/process_bot/process.rb', line 105

def restart(**args)
  logger.logs "Restart process"

  if handler_name == "sidekiq"
    if restart_overlap?(args)
      handler_instance.graceful_no_wait(stop_process_bot: false)
      start_runner_instance
    else
      handler_instance.graceful(stop_process_bot: false)
    end
  else
    handler_instance.stop
  end
end

#restart_overlap?(command_options = {}) ⇒ Boolean

Returns:

  • (Boolean)


271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/process_bot/process.rb', line 271

def restart_overlap?(command_options = {})
  value = if command_options.key?(:sidekiq_restart_overlap)
    command_options[:sidekiq_restart_overlap]
  else
    options[:sidekiq_restart_overlap]
  end
  return false if value.nil?
  return value if value == true || value == false

  normalized = value.to_s.strip.downcase
  return false if normalized == "false" || normalized == "0" || normalized == ""

  true
end

#restart_runner_if_needed(runner_instance) ⇒ Object



299
300
301
302
303
304
305
306
# File 'lib/process_bot/process.rb', line 299

def restart_runner_if_needed(runner_instance)
  return if stopped
  return unless runner_instance == current_runner_instance || current_runner_instance.nil?

  logger.logs "Process stopped - starting again after 1 sec"
  sleep 1
  start_runner_instance
end

#runObject



131
132
133
# File 'lib/process_bot/process.rb', line 131

def run
  start_runner_instance
end

#runnerObject



150
151
152
# File 'lib/process_bot/process.rb', line 150

def runner
  current_runner_instance&.runner || @runner ||= build_runner
end

#runner_instancesObject



253
254
255
256
257
# File 'lib/process_bot/process.rb', line 253

def runner_instances
  runner_monitor.synchronize do
    @runner_instances.dup
  end
end

#safe_application_basenameObject

Capistrano-style release paths (‘…/<app>/releases/<timestamp>`) resolve to a stable per-app basename like `awesome-tasks` that is consistent across deploys of the same app and distinct across apps sharing a host. `ControlSocket#ensure_no_duplicate_id!` uses it to scope the duplicate-id guard by `(application_basename, id)` so an unrelated app on the same host can safely reuse an id like `sidekiq-main`. Returns nil when release_path isn’t set.



185
186
187
188
189
# File 'lib/process_bot/process.rb', line 185

def safe_application_basename
  options.application_basename
rescue KeyError
  nil
end

#send_control_command(command, **command_options) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/process_bot/process.rb', line 135

def send_control_command(command, **command_options)
  logger.logs "Sending #{command} command"
  response = client.send_command(command: command, options: options.options.merge(command_options))

  if response == :nil
    handle_missing_control_command_response(command)
    return if options[:ignore_no_process_bot]

    raise "No response from ProcessBot while sending #{command}"
  end
rescue Errno::ECONNREFUSED => e
  handle_missing_control_command_response(command)
  raise e unless options[:ignore_no_process_bot]
end

#set_stoppedObject



80
81
82
# File 'lib/process_bot/process.rb', line 80

def set_stopped
  @stopped = true
end

#startObject



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/process_bot/process.rb', line 89

def start
  start_control_socket
  start_runner_instance

  loop do
    runner_event = runner_events.pop
    handle_runner_event(runner_event)

    next unless stopped && runner_instances.empty?

    stop_accepting_control_commands
    wait_for_control_commands
    break
  end
end

#start_control_socketObject



84
85
86
87
# File 'lib/process_bot/process.rb', line 84

def start_control_socket
  @control_socket = ProcessBot::ControlSocket.new(options: options, process: self)
  @control_socket.start
end

#start_runner_instanceObject



232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/process_bot/process.rb', line 232

def start_runner_instance
  runner_instance = ProcessBot::Process::RunnerInstance.new(
    runner: build_runner,
    event_queue: runner_events,
    logger: logger
  )

  track_runner_instance(runner_instance)
  @current_runner_instance = runner_instance
  @runner = runner_instance.runner
  runner_instance.start
end

#stop(**args) ⇒ Object



120
121
122
123
124
125
126
127
128
129
# File 'lib/process_bot/process.rb', line 120

def stop(**args)
  logger.logs "Stop process #{args}"
  @stopped = true
  handler_instance.stop

  if runner_instances.empty?
    logger.logs "No runner instances remaining, signaling main loop to exit"
    runner_events << {type: :stopped, runner_instance: nil}
  end
end

#stop_accepting_control_commandsObject



207
208
209
210
# File 'lib/process_bot/process.rb', line 207

def stop_accepting_control_commands
  @accept_control_commands = false
  @control_socket&.stop
end

#track_runner_instance(runner_instance) ⇒ Object



259
260
261
262
263
# File 'lib/process_bot/process.rb', line 259

def track_runner_instance(runner_instance)
  runner_monitor.synchronize do
    @runner_instances << runner_instance
  end
end

#truthy_option?(key) ⇒ Boolean

Returns:

  • (Boolean)


356
357
358
359
360
361
362
363
364
365
# File 'lib/process_bot/process.rb', line 356

def truthy_option?(key)
  value = options[key]
  return false if value.nil?
  return value if value == true || value == false

  normalized = value.to_s.strip.downcase
  return false if normalized == "false" || normalized == "0" || normalized == ""

  true
end

#update_process_titleObject



165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/process_bot/process.rb', line 165

def update_process_title
  process_args = {
    application: options[:application],
    application_basename: safe_application_basename,
    handler: handler_name,
    id: options[:id],
    pid: current_pid,
    port: port
  }
  @current_process_title = "ProcessBot #{JSON.generate(process_args)}"
  Process.setproctitle(current_process_title)
end

#wait_for_control_commandsObject



212
213
214
# File 'lib/process_bot/process.rb', line 212

def wait_for_control_commands
  sleep 0.1 while control_commands_in_flight.positive?
end

#with_control_commandObject



191
192
193
194
195
196
197
198
199
200
201
# File 'lib/process_bot/process.rb', line 191

def with_control_command
  control_command_monitor.synchronize do
    @control_commands_in_flight += 1
  end

  yield
ensure
  control_command_monitor.synchronize do
    @control_commands_in_flight -= 1
  end
end