Class: Pikuri::Subprocess

Inherits:
Object
  • Object
show all
Defined in:
lib/pikuri/subprocess.rb

Overview

Chokepoint for all subprocess spawning in pikuri. Forces a new process group for each invocation, tracks pgids so descendants of the direct child (commands backgrounded with &) can be cleaned up at process exit. Two front doors: Subprocess.spawn (combined stdout+stderr through a single pipe — the shell-command shape) and Subprocess.run (stdin fed from a String or streamed from an IO, stdout redirected to a file, stderr captured — the filter shape).

Seam discipline

All subprocess spawning in lib/ goes through Subprocess.spawn. Direct Process.spawn / Open3.* / system / backticks anywhere in lib/ are bugs. The convention is grep-enforceable: grep -rnE ‘Process.spawn|Open3.|bsystem(’ lib/ should only hit this file (plus the comment in pikuri-mcp/lib/pikuri/mcp/servers.rb explaining the MCP exception).

Timeouts are the caller’s job

Subprocess.spawn does not implement a timeout — Ruby’s Timeout.timeout cannot kill subprocesses cleanly. Callers that need a timeout wrap their argv with coreutils’ timeout binary:

Pikuri::Subprocess.spawn(
  'timeout', '--signal=TERM', '--kill-after=5s', '120s',
  'bash', '-c', command,
  chdir: workspace.cwd.to_s
)

When timeout and its FD-inheriting children die, the combined output pipe closes and #wait‘s io.read returns. No Ruby-side timeout machinery; the timeout binary handles SIGTERM-then- SIGKILL race-free.

Backgrounded subprocesses

When a shell command backgrounds work with &, the resulting process stays in our pgroup. #wait returns as soon as the direct child exits, but Subprocess.active keeps the pgid in the tracked set as long as any process in the group is alive (probed with kill(0, -pgid)). On pikuri exit, Subprocess.cleanup! sends SIGTERM to every tracked group. The model can opt out via nohup cmd & or setsid cmd & — both detach from our group.

State is process-global

One @active Set for the whole process, swept once at exit via Finalizers (see the registration at the bottom of this file). A Mutex guards register/prune/cleanup; v1 is single-threaded, so this is more for the exit-sweep/register race than for current callers.

Why Pikuri::Subprocess, not top-level

First class actually under the Pikuri:: namespace. Domain classes (Tool, Agent, URLCache) are top-level as a legacy convention — they predate the namespacing decision and an eventual refactor moves them too. For now: library-level infrastructure under Pikuri::; domain objects flat. See CLAUDE.md for the convention.

Defined Under Namespace

Classes: Result

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io:, wait_thr:) ⇒ Subprocess

Returns a new instance of Subprocess.



178
179
180
181
182
183
# File 'lib/pikuri/subprocess.rb', line 178

def initialize(io:, wait_thr:)
  @io       = io
  @wait_thr = wait_thr
  @pid      = wait_thr.pid
  @pgid     = wait_thr.pid # pgroup:true → pgid == pid
end

Instance Attribute Details

#ioIO (readonly)

Returns read end of the combined stdout+stderr pipe. Exposed for future live-streaming consumers; v1 callers go straight to #wait, which drains it.

Returns:

  • (IO)

    read end of the combined stdout+stderr pipe. Exposed for future live-streaming consumers; v1 callers go straight to #wait, which drains it.



175
176
177
# File 'lib/pikuri/subprocess.rb', line 175

def io
  @io
end

#pgidInteger (readonly)

Returns process group id. Equal to #pid since the child was spawned with pgroup: true (it’s the group leader).

Returns:

  • (Integer)

    process group id. Equal to #pid since the child was spawned with pgroup: true (it’s the group leader).



170
171
172
# File 'lib/pikuri/subprocess.rb', line 170

def pgid
  @pgid
end

#pidInteger (readonly)

Returns direct child’s pid.

Returns:

  • (Integer)

    direct child’s pid



166
167
168
# File 'lib/pikuri/subprocess.rb', line 166

def pid
  @pid
end

Class Method Details

.activeArray<Integer>

Currently-tracked process groups, with dead ones pruned as a side effect. Useful for a future /bg REPL command or a between-turn status line.

Returns:

  • (Array<Integer>)


222
223
224
225
226
227
# File 'lib/pikuri/subprocess.rb', line 222

def active
  @mutex.synchronize do
    @active.delete_if { |g| !alive?(g) }
    @active.to_a
  end
end

.cleanup!void

This method returns an undefined value.

SIGTERM every tracked process group. Run at process exit via Finalizers (production) and from after blocks (specs). Best-effort — ignores errors from already-dead groups.



234
235
236
237
238
239
# File 'lib/pikuri/subprocess.rb', line 234

def cleanup!
  @mutex.synchronize do
    @active.each { |g| Process.kill('-TERM', g) rescue nil }
    @active.clear
  end
end

.run(*argv, stdin_data:, stdout:, chdir:, env: {}) ⇒ Result

Run argv as a one-shot filter: feed it stdin_data, redirect its stdout straight to stdout (an open File), capture stderr through a pipe, and block until it exits. Built for the stdin→markdown document converters (pikuri-extractors), where spawn‘s shape is wrong twice over: it closes the child’s stdin immediately, and it merges stderr onto stdout — fatal when stdout is the payload and a converter’s warnings would corrupt it.

Redirecting stdout to a file (not a pipe) is also what makes the I/O deadlock-free with one writer thread: the child never blocks writing output, so it keeps draining stdin, while the parent drains the (low-volume) stderr pipe. The returned Pikuri::Subprocess::Result#output is the captured stderr — the diagnostics — not the payload; the payload is in stdout, whose file offset is shared with the child, so rewind before reading it back.

Same discipline as spawn: new process group, registered for the exit sweep, no built-in timeout (wrap argv with coreutils’ timeout — see the class docs).

Parameters:

  • argv (Array<String>)

    command and arguments, passed to exec directly — no implicit shell.

  • stdin_data (String, IO, StringIO)

    the child’s stdin: a String is written as-is, an IO is streamed through IO.copy_stream (so a large source file never materialises in the Ruby heap) and read from its current position. Either way stdin is closed (EOF) afterwards. May be empty.

  • stdout (File)

    open writable file the child’s stdout is redirected to.

  • chdir (String, Pathname)

    working directory.

  • env (Hash{String=>String}) (defaults to: {})

    extra environment variables, as for spawn.

Returns:

  • (Result)

    output is the captured stderr; status the child’s exit status.

Raises:

  • (SystemCallError)

    whatever an IO stdin_data raises mid-stream (disk error, closed handle) — re-raised here after the child has been reaped.



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/pikuri/subprocess.rb', line 133

def self.run(*argv, stdin_data:, stdout:, chdir:, env: {})
  in_r, in_w = IO.pipe
  err_r, err_w = IO.pipe
  pid = Process.spawn(env, *argv, chdir: chdir.to_s, pgroup: true,
                      in: in_r, out: stdout, err: err_w)
  in_r.close
  err_w.close
  register(pid)
  writer = Thread.new do
    in_w.binmode
    if stdin_data.respond_to?(:read)
      IO.copy_stream(stdin_data, in_w)
    else
      in_w.write(stdin_data)
    end
  rescue Errno::EPIPE
    nil # child exited without draining stdin; its status tells the story
  ensure
    in_w.close
  end
  stderr = err_r.read
  err_r.close
  # Reap before joining the writer: if an IO source raised
  # mid-stream, #join re-raises it, and the child (already exited —
  # err_r hit EOF) must not be left a zombie.
  _, status = Process.waitpid2(pid)
  writer.join
  Result.new(output: stderr, status: status)
ensure
  prune(pid) if pid
end

.spawn(*argv, chdir:, env: {}) ⇒ Subprocess

Spawn argv in a new process group, redirecting stderr onto stdout. Tracked for cleanup.

Parameters:

  • argv (Array<String>)

    command and arguments. Caller does any shell wrapping (e.g. ‘bash’, ‘-c’, cmd) when shell interpretation is wanted; argv is passed to exec directly, so no implicit shell expansion happens here.

  • chdir (String, Pathname)

    working directory

  • env (Hash{String=>String}) (defaults to: {})

    extra environment variables to set in the child process. The child otherwise inherits the parent’s full environment; entries in env override or add to it. Default {} (pure inheritance). Used by Code::Bash to thread Workspace::Filesystem#env (host git identity, etc.) into a bash subprocess whose sandbox would otherwise strip the host’s config files.

Returns:

  • (Subprocess)

    handle — call #wait to block for the direct child to exit and read the captured output



89
90
91
92
93
94
# File 'lib/pikuri/subprocess.rb', line 89

def self.spawn(*argv, chdir:, env: {})
  stdin, io, wait_thr = Open3.popen2e(env, *argv, chdir: chdir.to_s, pgroup: true)
  stdin.close
  register(wait_thr.pid)
  new(io: io, wait_thr: wait_thr)
end

Instance Method Details

#terminatevoid

This method returns an undefined value.

SIGTERM the whole process group without blocking for output —the stop button for a daemon child (one the caller never #waits on, e.g. Memory::Mem0Server‘s socat relay). Best-effort and idempotent: an already-dead group is a no-op. The group stays in the exit-sweep set until it actually dies, so a child that ignores SIGTERM is still re-signalled by cleanup! at process exit.



208
209
210
211
212
213
214
# File 'lib/pikuri/subprocess.rb', line 208

def terminate
  Process.kill('-TERM', @pgid)
rescue Errno::ESRCH
  # already gone
ensure
  self.class.send(:prune, @pgid)
end

#waitResult

Block until the direct child exits, read whatever remains on the combined-output pipe, return a Result. The pgid stays tracked if the group still has live members (backgrounded children); pruned if everything’s gone.

Returns:



191
192
193
194
195
196
197
# File 'lib/pikuri/subprocess.rb', line 191

def wait
  output = @io.read
  @io.close
  Result.new(output: output, status: @wait_thr.value)
ensure
  self.class.send(:prune, @pgid)
end