Class: Pikuri::Subprocess
- Inherits:
-
Object
- Object
- Pikuri::Subprocess
- Defined in:
- lib/pikuri/subprocess.rb
Overview
Chokepoint for all subprocess spawning in pikuri. Forces a new process group for each invocation, tracks pgids so descendants of the direct child (commands backgrounded with &) can be cleaned up at process exit. Two front doors: Subprocess.spawn (combined stdout+stderr through a single pipe — the shell-command shape) and Subprocess.run (stdin fed from a String or streamed from an IO, stdout redirected to a file, stderr captured — the filter shape).
Seam discipline
All subprocess spawning in lib/ goes through Subprocess.spawn. Direct Process.spawn / Open3.* / system / backticks anywhere in lib/ are bugs. The convention is grep-enforceable: grep -rnE ‘Process.spawn|Open3.|bsystem(’ lib/ should only hit this file (plus the comment in pikuri-mcp/lib/pikuri/mcp/servers.rb explaining the MCP exception).
Timeouts are the caller’s job
Subprocess.spawn does not implement a timeout — Ruby’s Timeout.timeout cannot kill subprocesses cleanly. Callers that need a timeout wrap their argv with coreutils’ timeout binary:
Pikuri::Subprocess.spawn(
'timeout', '--signal=TERM', '--kill-after=5s', '120s',
'bash', '-c', command,
chdir: workspace.cwd.to_s
)
When timeout and its FD-inheriting children die, the combined output pipe closes and #wait‘s io.read returns. No Ruby-side timeout machinery; the timeout binary handles SIGTERM-then- SIGKILL race-free.
Backgrounded subprocesses
When a shell command backgrounds work with &, the resulting process stays in our pgroup. #wait returns as soon as the direct child exits, but Subprocess.active keeps the pgid in the tracked set as long as any process in the group is alive (probed with kill(0, -pgid)). On pikuri exit, Subprocess.cleanup! sends SIGTERM to every tracked group. The model can opt out via nohup cmd & or setsid cmd & — both detach from our group.
State is process-global
One @active Set for the whole process, swept once at exit via Finalizers (see the registration at the bottom of this file). A Mutex guards register/prune/cleanup; v1 is single-threaded, so this is more for the exit-sweep/register race than for current callers.
Why Pikuri::Subprocess, not top-level
First class actually under the Pikuri:: namespace. Domain classes (Tool, Agent, URLCache) are top-level as a legacy convention — they predate the namespacing decision and an eventual refactor moves them too. For now: library-level infrastructure under Pikuri::; domain objects flat. See CLAUDE.md for the convention.
Defined Under Namespace
Classes: Result
Instance Attribute Summary collapse
-
#io ⇒ IO
readonly
Read end of the combined stdout+stderr pipe.
-
#pgid ⇒ Integer
readonly
Process group id.
-
#pid ⇒ Integer
readonly
Direct child’s pid.
Class Method Summary collapse
-
.active ⇒ Array<Integer>
Currently-tracked process groups, with dead ones pruned as a side effect.
-
.cleanup! ⇒ void
SIGTERM every tracked process group.
-
.run(*argv, stdin_data:, stdout:, chdir:, env: {}) ⇒ Result
Run
argvas a one-shot filter: feed itstdin_data, redirect its stdout straight tostdout(an openFile), capture stderr through a pipe, and block until it exits. -
.spawn(*argv, chdir:, env: {}) ⇒ Subprocess
Spawn
argvin a new process group, redirecting stderr onto stdout.
Instance Method Summary collapse
-
#initialize(io:, wait_thr:) ⇒ Subprocess
constructor
A new instance of Subprocess.
-
#terminate ⇒ void
SIGTERM the whole process group without blocking for output — the stop button for a daemon child (one the caller never #waits on, e.g. Memory::Mem0Server‘s socat relay).
-
#wait ⇒ Result
Block until the direct child exits, read whatever remains on the combined-output pipe, return a Result.
Constructor Details
#initialize(io:, wait_thr:) ⇒ Subprocess
Returns a new instance of Subprocess.
178 179 180 181 182 183 |
# File 'lib/pikuri/subprocess.rb', line 178 def initialize(io:, wait_thr:) @io = io @wait_thr = wait_thr @pid = wait_thr.pid @pgid = wait_thr.pid # pgroup:true → pgid == pid end |
Instance Attribute Details
#io ⇒ IO (readonly)
Returns read end of the combined stdout+stderr pipe. Exposed for future live-streaming consumers; v1 callers go straight to #wait, which drains it.
175 176 177 |
# File 'lib/pikuri/subprocess.rb', line 175 def io @io end |
#pgid ⇒ Integer (readonly)
Returns process group id. Equal to #pid since the child was spawned with pgroup: true (it’s the group leader).
170 171 172 |
# File 'lib/pikuri/subprocess.rb', line 170 def pgid @pgid end |
#pid ⇒ Integer (readonly)
Returns direct child’s pid.
166 167 168 |
# File 'lib/pikuri/subprocess.rb', line 166 def pid @pid end |
Class Method Details
.active ⇒ Array<Integer>
Currently-tracked process groups, with dead ones pruned as a side effect. Useful for a future /bg REPL command or a between-turn status line.
222 223 224 225 226 227 |
# File 'lib/pikuri/subprocess.rb', line 222 def active @mutex.synchronize do @active.delete_if { |g| !alive?(g) } @active.to_a end end |
.cleanup! ⇒ void
This method returns an undefined value.
SIGTERM every tracked process group. Run at process exit via Finalizers (production) and from after blocks (specs). Best-effort — ignores errors from already-dead groups.
234 235 236 237 238 239 |
# File 'lib/pikuri/subprocess.rb', line 234 def cleanup! @mutex.synchronize do @active.each { |g| Process.kill('-TERM', g) rescue nil } @active.clear end end |
.run(*argv, stdin_data:, stdout:, chdir:, env: {}) ⇒ Result
Run argv as a one-shot filter: feed it stdin_data, redirect its stdout straight to stdout (an open File), capture stderr through a pipe, and block until it exits. Built for the stdin→markdown document converters (pikuri-extractors), where spawn‘s shape is wrong twice over: it closes the child’s stdin immediately, and it merges stderr onto stdout — fatal when stdout is the payload and a converter’s warnings would corrupt it.
Redirecting stdout to a file (not a pipe) is also what makes the I/O deadlock-free with one writer thread: the child never blocks writing output, so it keeps draining stdin, while the parent drains the (low-volume) stderr pipe. The returned Pikuri::Subprocess::Result#output is the captured stderr — the diagnostics — not the payload; the payload is in stdout, whose file offset is shared with the child, so rewind before reading it back.
Same discipline as spawn: new process group, registered for the exit sweep, no built-in timeout (wrap argv with coreutils’ timeout — see the class docs).
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/pikuri/subprocess.rb', line 133 def self.run(*argv, stdin_data:, stdout:, chdir:, env: {}) in_r, in_w = IO.pipe err_r, err_w = IO.pipe pid = Process.spawn(env, *argv, chdir: chdir.to_s, pgroup: true, in: in_r, out: stdout, err: err_w) in_r.close err_w.close register(pid) writer = Thread.new do in_w.binmode if stdin_data.respond_to?(:read) IO.copy_stream(stdin_data, in_w) else in_w.write(stdin_data) end rescue Errno::EPIPE nil # child exited without draining stdin; its status tells the story ensure in_w.close end stderr = err_r.read err_r.close # Reap before joining the writer: if an IO source raised # mid-stream, #join re-raises it, and the child (already exited — # err_r hit EOF) must not be left a zombie. _, status = Process.waitpid2(pid) writer.join Result.new(output: stderr, status: status) ensure prune(pid) if pid end |
.spawn(*argv, chdir:, env: {}) ⇒ Subprocess
Spawn argv in a new process group, redirecting stderr onto stdout. Tracked for cleanup.
89 90 91 92 93 94 |
# File 'lib/pikuri/subprocess.rb', line 89 def self.spawn(*argv, chdir:, env: {}) stdin, io, wait_thr = Open3.popen2e(env, *argv, chdir: chdir.to_s, pgroup: true) stdin.close register(wait_thr.pid) new(io: io, wait_thr: wait_thr) end |
Instance Method Details
#terminate ⇒ void
This method returns an undefined value.
SIGTERM the whole process group without blocking for output —the stop button for a daemon child (one the caller never #waits on, e.g. Memory::Mem0Server‘s socat relay). Best-effort and idempotent: an already-dead group is a no-op. The group stays in the exit-sweep set until it actually dies, so a child that ignores SIGTERM is still re-signalled by cleanup! at process exit.
208 209 210 211 212 213 214 |
# File 'lib/pikuri/subprocess.rb', line 208 def terminate Process.kill('-TERM', @pgid) rescue Errno::ESRCH # already gone ensure self.class.send(:prune, @pgid) end |
#wait ⇒ Result
Block until the direct child exits, read whatever remains on the combined-output pipe, return a Result. The pgid stays tracked if the group still has live members (backgrounded children); pruned if everything’s gone.
191 192 193 194 195 196 197 |
# File 'lib/pikuri/subprocess.rb', line 191 def wait output = @io.read @io.close Result.new(output: output, status: @wait_thr.value) ensure self.class.send(:prune, @pgid) end |