Class: ProcessBot::Process::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/process_bot/process/runner.rb

Constant Summary collapse

READ_CHUNK_SIZE =
4096

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(command:, handler_instance:, handler_name:, logger:, options:) ⇒ Runner

Returns a new instance of Runner.



8
9
10
11
12
13
14
15
# File 'lib/process_bot/process/runner.rb', line 8

def initialize(command:, handler_instance:, handler_name:, logger:, options:)
  @command = command
  @handler_instance = handler_instance
  @handler_name = handler_name
  @logger = logger || ProcessBot::Logger.new(options: options)
  @monitor = Monitor.new
  @options = options
end

Instance Attribute Details

#commandObject (readonly)

Returns the value of attribute command.



4
5
6
# File 'lib/process_bot/process/runner.rb', line 4

def command
  @command
end

#exit_statusObject (readonly)

Returns the value of attribute exit_status.



4
5
6
# File 'lib/process_bot/process/runner.rb', line 4

def exit_status
  @exit_status
end

#handler_instanceObject (readonly)

Returns the value of attribute handler_instance.



4
5
6
# File 'lib/process_bot/process/runner.rb', line 4

def handler_instance
  @handler_instance
end

#handler_nameObject (readonly)

Returns the value of attribute handler_name.



4
5
6
# File 'lib/process_bot/process/runner.rb', line 4

def handler_name
  @handler_name
end

#loggerObject (readonly)

Returns the value of attribute logger.



4
5
6
# File 'lib/process_bot/process/runner.rb', line 4

def logger
  @logger
end

#monitorObject (readonly)

Returns the value of attribute monitor.



4
5
6
# File 'lib/process_bot/process/runner.rb', line 4

def monitor
  @monitor
end

#optionsObject (readonly)

Returns the value of attribute options.



4
5
6
# File 'lib/process_bot/process/runner.rb', line 4

def options
  @options
end

#pidObject (readonly)

Returns the value of attribute pid.



4
5
6
# File 'lib/process_bot/process/runner.rb', line 4

def pid
  @pid
end

#stop_timeObject (readonly)

Returns the value of attribute stop_time.



4
5
6
# File 'lib/process_bot/process/runner.rb', line 4

def stop_time
  @stop_time
end

#subprocess_pidObject (readonly)

Returns the value of attribute subprocess_pid.



4
5
6
# File 'lib/process_bot/process/runner.rb', line 4

def subprocess_pid
  @subprocess_pid
end

Instance Method Details



179
180
181
182
183
184
185
186
187
# File 'lib/process_bot/process/runner.rb', line 179

def assign_related_sidekiq_pid
  related_sidekiq_processes.each do |related_sidekiq_process| # rubocop:disable Lint/UnreachableLoop
    logger.logs "Found PID: #{related_sidekiq_process.pid}"
    @pid = related_sidekiq_process.pid
    options.events.call(:on_process_started, pid: related_sidekiq_process.pid)

    break
  end
end

#ensure_subprocess_pgid_for_stop!Object



144
145
146
147
148
149
150
# File 'lib/process_bot/process/runner.rb', line 144

def ensure_subprocess_pgid_for_stop!
  wait_for_subprocess_pid_for_stop! if subprocess_pid.nil?

  return true if subprocess_pgid

  raise "Unable to stop related processes because subprocess PGID could not be resolved (subprocess PID: #{subprocess_pid.inspect})"
end

#find_sidekiq_pidObject



162
163
164
165
166
# File 'lib/process_bot/process/runner.rb', line 162

def find_sidekiq_pid
  Thread.new do
    wait_for_sidekiq_pid
  end
end

#flush_complete_lines(type:, buffer:) ⇒ Object



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/process_bot/process/runner.rb', line 189

def flush_complete_lines(type:, buffer:)
  loop do
    separator_index = next_separator_index(buffer)
    break unless separator_index

    monitor.synchronize do
      output(type: type, output: buffer.slice!(0, separator_index + 1))
    end
  end

  return if buffer.bytesize < READ_CHUNK_SIZE

  monitor.synchronize do
    output(type: type, output: buffer.dup)
  end

  buffer.clear
end

#flush_remaining_output(type:, buffer:) ⇒ Object



208
209
210
211
212
213
214
215
216
# File 'lib/process_bot/process/runner.rb', line 208

def flush_remaining_output(type:, buffer:)
  return if buffer.empty?

  monitor.synchronize do
    output(type: type, output: buffer.dup)
  end

  buffer.clear
end

#next_separator_index(buffer) ⇒ Object



218
219
220
# File 'lib/process_bot/process/runner.rb', line 218

def next_separator_index(buffer)
  buffer.index(/[\r\n]/)
end

#output(output:, type:) ⇒ Object



17
18
19
# File 'lib/process_bot/process/runner.rb', line 17

def output(output:, type:)
  logger.log(output, type: type)
end


82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/process_bot/process/runner.rb', line 82

def related_processes
  related_processes = []
  process_group_id = subprocess_pgid
  return related_processes unless process_group_id

  Knj::Unix_proc.list do |process|
    begin
      process_pgid = Process.getpgid(process.pid)
    rescue Errno::ESRCH
      # Process no longer running
    end

    related_processes << process if process_group_id == process_pgid
  end

  related_processes
end


100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/process_bot/process/runner.rb', line 100

def related_sidekiq_processes
  related_sidekiq_processes = []
  process_group_id = subprocess_pgid
  return related_sidekiq_processes unless process_group_id

  Knj::Unix_proc.list("grep" => "sidekiq") do |process|
    cmd = process.data.fetch("cmd")

    if /sidekiq ([0-9]+\.[0-9]+\.[0-9]+) (#{options.possible_process_titles_joined_regex})/.match?(cmd)
      sidekiq_pid = process.data.fetch("pid").to_i

      begin
        sidekiq_pgid = Process.getpgid(sidekiq_pid)
      rescue Errno::ESRCH
        # Process no longer running
      end

      related_sidekiq_processes << process if process_group_id == sidekiq_pgid
    end
  end

  related_sidekiq_processes
end

#runObject

rubocop:disable Metrics/AbcSize



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/process_bot/process/runner.rb', line 36

def run # rubocop:disable Metrics/AbcSize
  @start_time = Time.new
  stderr_reader, stderr_writer = IO.pipe

  require "pty"

  PTY.spawn(command, err: stderr_writer.fileno) do |stdout, _stdin, pid|
    @subprocess_pid = pid
    logger.logs "Command running with PID #{pid}: #{command}"

    stdout_reader_thread = Thread.new do
      stream_output(stdout, type: :stdout)
    ensure
      status = Process::Status.wait(subprocess_pid, 0)

      @exit_status = status.exitstatus
      stderr_writer.close
    end

    stderr_reader_thread = Thread.new do
      stream_output(stderr_reader, type: :stderr)
    end

    find_sidekiq_pid if handler_name == "sidekiq"

    stdout_reader_thread.join
    stderr_reader_thread.join

    logger.logs "Process stopped"

    @stop_time = Time.new
  end
end

#running?Boolean

Returns:

  • (Boolean)


32
33
34
# File 'lib/process_bot/process/runner.rb', line 32

def running?
  !stop_time
end

#sidekiq_app_nameObject



78
79
80
# File 'lib/process_bot/process/runner.rb', line 78

def sidekiq_app_name
  options.fetch(:application)
end


124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/process_bot/process/runner.rb', line 124

def stop_related_processes
  ensure_subprocess_pgid_for_stop!

  loop do
    processes = related_processes

    if processes.length <= 0
      logger.logs "No related processes could be found"
      break
    else
      processes.each do |process|
        logger.logs "Killing process with signal TERM for PID #{process.pid}: #{process.data.fetch("cmd")}"
        Process.kill("TERM", process.pid)
      end

      sleep 0.5
    end
  end
end

#stream_output(io, type:) ⇒ Object



21
22
23
24
25
26
27
28
29
30
# File 'lib/process_bot/process/runner.rb', line 21

def stream_output(io, type:)
  buffer = +""

  loop do
    buffer << io.readpartial(READ_CHUNK_SIZE)
    flush_complete_lines(type: type, buffer: buffer)
  end
rescue EOFError, Errno::EIO
  flush_remaining_output(type: type, buffer: buffer)
end

#subprocess_pgidObject



70
71
72
73
74
75
76
# File 'lib/process_bot/process/runner.rb', line 70

def subprocess_pgid
  return @subprocess_pgid if instance_variable_defined?(:@subprocess_pgid)

  @subprocess_pgid = Process.getpgid(subprocess_pid) if subprocess_pid
rescue Errno::ESRCH
  @subprocess_pgid = nil
end

#wait_for_sidekiq_pidObject



168
169
170
171
172
173
174
175
176
177
# File 'lib/process_bot/process/runner.rb', line 168

def wait_for_sidekiq_pid
  while running? && !pid
    assign_related_sidekiq_pid

    unless pid
      logger.logs "Waiting 1 second before trying to find Sidekiq PID again"
      sleep 1
    end
  end
end

#wait_for_subprocess_pid_for_stop!Object



152
153
154
155
156
157
158
159
160
# File 'lib/process_bot/process/runner.rb', line 152

def wait_for_subprocess_pid_for_stop!
  deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 1

  sleep 0.05 while subprocess_pid.nil? && Process.clock_gettime(Process::CLOCK_MONOTONIC) < deadline

  return unless subprocess_pid.nil?

  raise "Unable to stop related processes because subprocess PID has not been recorded yet"
end