Module: Landlock::ProcessIO

Defined in:
lib/landlock/process_io.rb

Class Method Summary collapse

Class Method Details

.append_output_chunk(buffer, chunk, state, max_output_bytes, truncate_output, pid, output_too_large_error: Landlock::OutputTooLargeError) ⇒ Object

Raises:

  • (output_too_large_error)


203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/landlock/process_io.rb', line 203

def append_output_chunk(
  buffer,
  chunk,
  state,
  max_output_bytes,
  truncate_output,
  pid,
  output_too_large_error: Landlock::OutputTooLargeError
)
  return buffer << chunk if max_output_bytes.nil?

  chunk_to_append = chunk
  over_limit = false
  remaining_bytes = max_output_bytes - state[:bytes]
  if remaining_bytes <= 0
    chunk_to_append = ""
    over_limit = true
  elsif chunk.bytesize > remaining_bytes
    chunk_to_append = chunk.byteslice(0, remaining_bytes)
    over_limit = true
  end

  state[:bytes] += chunk.bytesize
  state[:truncated] = true if over_limit
  buffer << chunk_to_append
  return unless over_limit

  terminate_process(pid)
  raise output_too_large_error, "Process output exceeded #{max_output_bytes} bytes" unless truncate_output
end

.capture_result(stdout, stderr, status, output_truncated:, timed_out:) ⇒ Object



51
52
53
54
55
# File 'lib/landlock/process_io.rb', line 51

def capture_result(stdout, stderr, status, output_truncated:, timed_out:)
  stdout.force_encoding(Encoding.default_external)
  stderr.force_encoding(Encoding.default_external)
  CaptureResult.new(stdout:, stderr:, status:, output_truncated:, timed_out:)
end

.close_stream(io) ⇒ Object



165
166
167
168
# File 'lib/landlock/process_io.rb', line 165

def close_stream(io)
  io.close unless io.closed?
rescue IOError
end

.close_streams(streams) ⇒ Object



195
196
197
198
199
200
201
# File 'lib/landlock/process_io.rb', line 195

def close_streams(streams)
  streams.keys.each do |io|
    streams.delete(io)
    io.close unless io.closed?
  rescue IOError
  end
end

.complete_pipe_capture(pid, stdout_reader, stderr_reader, stdin_writer, stdin, timeout, max_output_bytes, truncate_output) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/landlock/process_io.rb', line 15

def complete_pipe_capture(
  pid,
  stdout_reader,
  stderr_reader,
  stdin_writer,
  stdin,
  timeout,
  max_output_bytes,
  truncate_output
)
  stdin_thread = write_input(stdin_writer, stdin)

  stdout = +"".b
  stderr = +"".b
  state = { bytes: 0, truncated: false }
  begin
    status, timed_out =
      read_and_wait(
        pid,
        { stdout_reader => stdout, stderr_reader => stderr },
        timeout,
        max_output_bytes,
        truncate_output,
        state
      )
  rescue OutputTooLargeError => error
    status ||= wait_for_pid(pid)
    error.result = capture_result(stdout, stderr, status, output_truncated: true, timed_out:)
    raise
  ensure
    finish_input_thread(stdin_thread, stdin_writer)
  end

  capture_result(stdout, stderr, status, output_truncated: state[:truncated], timed_out:)
end

.drain_streams_until(streams, drain_deadline, max_output_bytes, truncate_output, state, pid) ⇒ Object



189
190
191
192
193
# File 'lib/landlock/process_io.rb', line 189

def drain_streams_until(streams, drain_deadline, max_output_bytes, truncate_output, state, pid)
  while streams.any? && ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) < drain_deadline
    break unless read_available_streams(streams, max_output_bytes, truncate_output, state, pid)
  end
end

.finish_input_thread(thread, io) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
# File 'lib/landlock/process_io.rb', line 77

def finish_input_thread(thread, io)
  close_stream(io)
  return unless thread

  if thread.join(STDIN_THREAD_JOIN_SECONDS)
    thread.value
  else
    thread.kill
    thread.join(STDIN_THREAD_JOIN_SECONDS)
  end
end

.poll_pid(pid) ⇒ Object



152
153
154
155
156
157
# File 'lib/landlock/process_io.rb', line 152

def poll_pid(pid)
  result = ::Process.wait2(pid, ::Process::WNOHANG)
  result&.last
rescue Errno::ECHILD
  nil
end

.read_and_wait(pid, streams, timeout, max_output_bytes, truncate_output, state) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/landlock/process_io.rb', line 89

def read_and_wait(pid, streams, timeout, max_output_bytes, truncate_output, state)
  deadline = timeout ? ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + timeout : nil
  timed_out = false
  status = nil

  until streams.empty? && status
    if deadline
      remaining = deadline - ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
      if remaining <= 0
        timed_out = true
        terminate_process(pid)
        status = wait_for_pid(pid)
        drain_streams_until(
          streams,
          ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + POST_TIMEOUT_DRAIN_SECONDS,
          max_output_bytes,
          truncate_output,
          state,
          pid
        )
        close_streams(streams)
        break
      end
    end

    status ||= poll_pid(pid)

    break if streams.empty? && status

    wait =
      (
        if deadline
          [deadline - ::Process.clock_gettime(::Process::CLOCK_MONOTONIC), PROCESS_POLL_SECONDS].min
        else
          PROCESS_POLL_SECONDS
        end
      )
    wait = 0 if wait.negative?
    if streams.empty?
      sleep wait
      next
    end

    readable, = IO.select(streams.keys, nil, nil, wait)
    next unless readable

    readable.each do |io|
      begin
        chunk = io.read_nonblock(READ_CHUNK_BYTES)
        append_output_chunk(streams.fetch(io), chunk, state, max_output_bytes, truncate_output, pid)
      rescue IO::WaitReadable
        next
      rescue EOFError
        streams.delete(io)
        io.close
      end
    end
  end

  status ||= wait_for_pid(pid)
  [status, timed_out]
end

.read_available_streams(streams, max_output_bytes, truncate_output, state, pid) ⇒ Object



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/landlock/process_io.rb', line 170

def read_available_streams(streams, max_output_bytes, truncate_output, state, pid)
  readable, = IO.select(streams.keys, nil, nil, 0)
  return false unless readable

  readable.each do |io|
    begin
      chunk = io.read_nonblock(READ_CHUNK_BYTES)
      append_output_chunk(streams.fetch(io), chunk, state, max_output_bytes, truncate_output, pid)
    rescue IO::WaitReadable
      next
    rescue EOFError
      streams.delete(io)
      io.close
    end
  end

  true
end

.signal_process(signal, pid) ⇒ Object



240
241
242
243
244
245
246
247
# File 'lib/landlock/process_io.rb', line 240

def signal_process(signal, pid)
  ::Process.kill(signal, -pid)
rescue Errno::ESRCH, Errno::EPERM
  begin
    ::Process.kill(signal, pid)
  rescue Errno::ESRCH, Errno::EPERM
  end
end

.terminate_process(pid) ⇒ Object



234
235
236
237
238
# File 'lib/landlock/process_io.rb', line 234

def terminate_process(pid)
  signal_process("TERM", pid)
  sleep 0.5
  signal_process("KILL", pid)
end

.wait_for_pid(pid) ⇒ Object



159
160
161
162
163
# File 'lib/landlock/process_io.rb', line 159

def wait_for_pid(pid)
  ::Process.wait2(pid).last
rescue Errno::ECHILD
  nil
end

.write_input(io, input) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/landlock/process_io.rb', line 57

def write_input(io, input)
  return io.close if input.nil?

  Thread.new do
    Thread.current.report_on_exception = false
    begin
      if input.respond_to?(:read)
        while (chunk = input.read(READ_CHUNK_BYTES))
          io.write(chunk)
        end
      else
        io.write(input.to_s)
      end
    rescue Errno::EPIPE, IOError
    ensure
      io.close unless io.closed?
    end
  end
end