Class: Ace::LLM::Providers::CLI::Molecules::SafeCapture
- Inherits:
-
Object
- Object
- Ace::LLM::Providers::CLI::Molecules::SafeCapture
- Defined in:
- lib/ace/llm/providers/cli/molecules/safe_capture.rb
Overview
Thread-safe command execution with process-level timeout.
Replaces the unsafe Timeout.timeout { Open3.capture3(…) } pattern which causes “stream closed in another thread (IOError)” when the timeout fires while Open3’s internal reader threads hold pipe handles.
Uses Open3.popen3 + Process.kill so the child process is terminated directly — no thread interruption, no IOError.
Class Method Summary collapse
-
.call(cmd, timeout:, stdin_data: nil, chdir: nil, env: nil, provider_name: "CLI", command_prefix: nil, isolate_process_group: true, cleanup_group_on_exit: true) ⇒ Array(String, String, Process::Status)
[stdout, stderr, status].
Class Method Details
.call(cmd, timeout:, stdin_data: nil, chdir: nil, env: nil, provider_name: "CLI", command_prefix: nil, isolate_process_group: true, cleanup_group_on_exit: true) ⇒ Array(String, String, Process::Status)
Returns [stdout, stderr, status].
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/ace/llm/providers/cli/molecules/safe_capture.rb', line 29 def self.call(cmd, timeout:, stdin_data: nil, chdir: nil, env: nil, provider_name: "CLI", command_prefix: nil, isolate_process_group: true, cleanup_group_on_exit: true) normalized_timeout = normalize_timeout(timeout) opts = {} opts[:chdir] = chdir if chdir opts[:pgroup] = true if isolate_process_group full_cmd = Array(command_prefix) + cmd args = env ? [env, *full_cmd] : full_cmd Open3.popen3(*args, **opts) do |stdin, stdout, stderr, wait_thr| pid = wait_thr.pid pgid = safe_getpgid(pid) debug_log(provider_name, "spawn pid=#{pid} pgid=#{pgid || "n/a"}") begin stdin.write(stdin_data) if stdin_data rescue Errno::EPIPE # Subprocess exited before consuming stdin — continue to capture stderr for the real error end stdin.close out_reader = Thread.new { safe_read_stream(stdout) } err_reader = Thread.new { safe_read_stream(stderr) } out_reader.report_on_exception = false err_reader.report_on_exception = false unless wait_thr.join(normalized_timeout) # Timeout: kill subprocess group (and descendants), then clean up terminate_subprocess_tree(pid: pid, pgid: pgid, provider_name: provider_name) wait_thr.join(5) stdout.close unless stdout.closed? stderr.close unless stderr.closed? out_reader.join(1) err_reader.join(1) out_reader.kill if out_reader.alive? err_reader.kill if err_reader.alive? raise Ace::LLM::ProviderError, "#{provider_name} CLI execution timed out after #{normalized_timeout} seconds" end status = wait_thr.value if isolate_process_group && cleanup_group_on_exit terminate_descendants_after_success(pid: pid, pgid: pgid, provider_name: provider_name) end [out_reader.value, err_reader.value, status] end end |