Module: Ocak::ProcessRunner

Defined in:
lib/ocak/process_runner.rb

Overview

Runs a subprocess with streaming line output and timeout support.

Defined Under Namespace

Classes: FailedStatus

Constant Summary collapse

KILL_GRACE_PERIOD =
2

Class Method Summary collapse

Class Method Details

.kill_process(pid) ⇒ Object



57
58
59
60
61
62
63
64
# File 'lib/ocak/process_runner.rb', line 57

def kill_process(pid)
  Process.kill('TERM', pid)
  sleep KILL_GRACE_PERIOD
  Process.kill('KILL', pid)
rescue Errno::ESRCH, Errno::EPERM => e
  warn("Process already exited during kill: #{e.message}")
  nil
end

.process_lines(line_buf, chunk, on_line) ⇒ Object



83
84
85
86
87
88
89
90
# File 'lib/ocak/process_runner.rb', line 83

def process_lines(line_buf, chunk, on_line)
  return unless on_line

  line_buf << chunk
  while (idx = line_buf.index("\n"))
    on_line.call(line_buf.slice!(0, idx + 1).chomp)
  end
end

.read_available(readers, remaining, ctx) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/ocak/process_runner.rb', line 66

def read_available(readers, remaining, ctx)
  ready = IO.select(readers, nil, nil, [remaining, 1].min)
  return unless ready

  ready[0].each do |io|
    chunk = io.read_nonblock(8192)
    if io == readers[0]
      ctx[:stdout] << chunk
      process_lines(ctx[:line_buf], chunk, ctx[:on_line])
    else
      ctx[:stderr] << chunk
    end
  rescue EOFError
    readers.delete(io)
  end
end

.read_streams(out, err, ctx) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/ocak/process_runner.rb', line 40

def read_streams(out, err, ctx)
  readers = [out, err]

  until readers.empty?
    remaining = ctx[:deadline] ? ctx[:deadline] - Process.clock_gettime(Process::CLOCK_MONOTONIC) : 5

    if ctx[:deadline] && remaining <= 0
      kill_process(ctx[:wait_thr].pid)
      return ['', "Timed out after #{ctx[:timeout]}s", +'']
    end

    read_available(readers, remaining, ctx)
  end

  [ctx[:stdout], ctx[:stderr], ctx[:line_buf]]
end

.run(cmd, chdir:, timeout: nil, on_line: nil, registry: nil) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/ocak/process_runner.rb', line 16

def run(cmd, chdir:, timeout: nil, on_line: nil, registry: nil)
  stdout = +''
  stderr = +''
  line_buf = +''

  Open3.popen3(*cmd, chdir: chdir) do |stdin, out, err, wait_thr|
    stdin.close
    registry&.register(wait_thr.pid)
    ctx = {
      stdout: +'', stderr: +'', line_buf: +'',
      deadline: timeout ? Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout : nil,
      timeout: timeout, wait_thr: wait_thr, on_line: on_line
    }

    stdout, stderr, line_buf = read_streams(out, err, ctx)
    on_line&.call(line_buf.chomp) unless line_buf.empty?
    [stdout, stderr, wait_thr.value]
  ensure
    registry&.unregister(wait_thr.pid)
  end
rescue Errno::ENOENT => e
  ['', e.message, FailedStatus.instance]
end