Class: PiAgent::Transport::Subprocess

Inherits:
Object
  • Object
show all
Defined in:
lib/pi_agent/transport/subprocess.rb

Overview

Runs ‘pi –mode rpc` as a local child process and speaks NDJSON over its stdio.

One reader thread per pipe; stdout lines are JSON-parsed and dispatched to ‘on_message`; stderr lines are forwarded raw to `on_stderr`. Writes are serialized through a mutex so concurrent senders don’t interleave JSON payloads on stdin.

Constant Summary collapse

DEFAULT_CHUNK_SIZE =
4096
DEFAULT_CLOSE_TIMEOUT =
5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(command:, env: {}, cwd: nil, on_message: nil, on_stderr: nil) ⇒ Subprocess

‘cwd` sets the child’s working directory — pi’s built-in tools (bash/read/edit/…) operate relative to it. nil leaves the child in this process’s working directory.



24
25
26
27
28
29
30
31
32
# File 'lib/pi_agent/transport/subprocess.rb', line 24

def initialize(command:, env: {}, cwd: nil, on_message: nil, on_stderr: nil)
  @command = Array(command)
  @env = env.transform_keys(&:to_s)
  @cwd = cwd
  @on_message = on_message
  @on_stderr = on_stderr
  @write_mutex = Mutex.new
  @closed = false
end

Instance Attribute Details

#pidObject (readonly)

Returns the value of attribute pid.



19
20
21
# File 'lib/pi_agent/transport/subprocess.rb', line 19

def pid
  @pid
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


67
68
69
# File 'lib/pi_agent/transport/subprocess.rb', line 67

def alive?
  !!@wait_thr&.alive?
end

#close(timeout: DEFAULT_CLOSE_TIMEOUT) ⇒ Object



57
58
59
60
61
62
63
64
65
# File 'lib/pi_agent/transport/subprocess.rb', line 57

def close(timeout: DEFAULT_CLOSE_TIMEOUT)
  return if mark_closed!

  safe_close(@stdin)
  wait_for_exit(timeout)
  [@stdout_thread, @stderr_thread].compact.each(&:join)
  safe_close(@stdout)
  safe_close(@stderr)
end

#exit_statusObject



71
72
73
# File 'lib/pi_agent/transport/subprocess.rb', line 71

def exit_status
  @wait_thr&.value
end

#startObject



34
35
36
37
38
39
40
41
42
43
# File 'lib/pi_agent/transport/subprocess.rb', line 34

def start
  @stdin, @stdout, @stderr, @wait_thr = Open3.popen3(*spawn_args)
  @pid = @wait_thr.pid
  @stdin.binmode
  @stdout.binmode
  @stderr.binmode
  @stdout_thread = Thread.new { read_loop(@stdout, :stdout) }
  @stderr_thread = Thread.new { read_loop(@stderr, :stderr) }
  self
end

#write(obj) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/pi_agent/transport/subprocess.rb', line 45

def write(obj)
  payload = "#{JSON.generate(obj)}\n"
  @write_mutex.synchronize do
    raise ProtocolError, "Transport closed" if @closed

    @stdin.write(payload)
    @stdin.flush
  end
rescue Errno::EPIPE
  raise ProtocolError, "Broken pipe writing to subprocess (process may have exited)"
end