Class: Rubino::Tools::ShellRegistry

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/tools/shell_registry.rb

Overview

Process-wide registry for shell commands started with ‘run_in_background`. Each entry owns a pgid (process group), a reader thread that drains stdout+stderr into an in-memory ring buffer, and the wait_thr for exit.

The registry survives a single CLI/server process — it is intentionally NOT persisted to disk. Background shells die with the agent process.

Defined Under Namespace

Classes: Entry

Constant Summary collapse

RING_BYTES =

cap per run; older bytes are dropped

256 * 1024

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeShellRegistry

Returns a new instance of ShellRegistry.



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/rubino/tools/shell_registry.rb', line 36

def initialize
  @entries = {}
  @mutex   = Mutex.new
  # Live FOREGROUND shell process groups, keyed by pgid. A foreground
  # shell's pgid otherwise lives only in the ShellTool#execute_foreground
  # stack frame of the (sub)agent thread that started it — so on
  # parent-death there is nothing process-wide to reap it and it
  # reparents to init as an orphan (MED-2). Tracking it here lets
  # #kill_all_groups SIGTERM/SIGKILL it synchronously on teardown.
  @fg_pgids = {}
end

Class Method Details

.instanceObject



24
25
26
# File 'lib/rubino/tools/shell_registry.rb', line 24

def instance
  @instance ||= new
end

.reset!Object

Test seam: drop the process-wide registry between examples so the situational shell-tool gate (#313) starts each spec with no background shell. Mirrors BackgroundTasks.reset!.



31
32
33
# File 'lib/rubino/tools/shell_registry.rb', line 31

def reset!
  @instance = nil
end

Instance Method Details

#any?Boolean

True when at least one background shell has been started this session (and not yet removed). The session-stable signal #313 gates the shell-management tools on: a normal turn with no background shell never ships shell_input/shell_output/shell_tail/shell_kill. Flips at most once per session (when the first background shell is spawned), so the cached tool prefix stays stable across ordinary turns.

Returns:

  • (Boolean)


107
108
109
# File 'lib/rubino/tools/shell_registry.rb', line 107

def any?
  @mutex.synchronize { !@entries.empty? }
end

#close_stdin(entry) ⇒ Object

Closes the write end of the child’s stdin (sends EOF). Idempotent.



131
132
133
134
135
136
# File 'lib/rubino/tools/shell_registry.rb', line 131

def close_stdin(entry)
  io = entry&.stdin
  io.close if io && !io.closed?
rescue IOError
  # already closed
end

#exit_code(entry) ⇒ Object



159
160
161
162
163
# File 'lib/rubino/tools/shell_registry.rb', line 159

def exit_code(entry)
  return nil if entry.wait_thr.alive?

  entry.wait_thr.value.exitstatus
end

#find(id) ⇒ Object



97
98
99
# File 'lib/rubino/tools/shell_registry.rb', line 97

def find(id)
  @mutex.synchronize { @entries[id] }
end

#kill_all_groups(grace: 0.5) ⇒ Object

Synchronous teardown reaper (MED-2): SIGTERM every live shell process group this session owns — the background ENTRIES and the tracked FOREGROUND pgids — give them a brief grace, then SIGKILL any straggler. Mirrors the Python Hermes ‘_kill_process` (os.killpg SIGTERM → wait →SIGKILL). Called from BackgroundTasks#cancel_all so EVERY parent-death edge (clean quit `ensure`, HUP/TERM trap, REPL break) reaps the child shells the cooperative cancel token alone can’t reach before the process exits and the shells reparent to init. Returns the pgids it signalled.



173
174
175
176
177
178
179
180
181
# File 'lib/rubino/tools/shell_registry.rb', line 173

def kill_all_groups(grace: 0.5)
  pgids = @mutex.synchronize { (@entries.values.map(&:pgid) + @fg_pgids.keys).uniq }
  return pgids if pgids.empty?

  pgids.each { |pgid| signal_group("TERM", pgid) }
  sleep(grace) if grace.positive?
  pgids.each { |pgid| signal_group("KILL", pgid) }
  pgids
end

#read_all(entry) ⇒ Object



148
149
150
# File 'lib/rubino/tools/shell_registry.rb', line 148

def read_all(entry)
  entry.mutex.synchronize { entry.buffer.dup }
end

#read_new(entry) ⇒ Object

Reads accumulated bytes since the last ‘read_new` call. Returns the full snapshot if `since` is nil. Thread-safe.



140
141
142
143
144
145
146
# File 'lib/rubino/tools/shell_registry.rb', line 140

def read_new(entry)
  entry.mutex.synchronize do
    snapshot = entry.buffer.byteslice(entry.read_offset..) || ""
    entry.read_offset = entry.buffer.bytesize
    snapshot
  end
end

#register_pgid(pgid) ⇒ Object

Track a live foreground shell process group so teardown can reap it.



49
50
51
52
# File 'lib/rubino/tools/shell_registry.rb', line 49

def register_pgid(pgid)
  @mutex.synchronize { @fg_pgids[pgid] = true }
  pgid
end

#remove(id) ⇒ Object



111
112
113
114
115
# File 'lib/rubino/tools/shell_registry.rb', line 111

def remove(id)
  entry = @mutex.synchronize { @entries.delete(id) }
  close_stdin(entry) if entry
  entry
end

#spawn(command:, cwd:) ⇒ Object

Spawns ‘command` detached in its own process group so a single kill takes out the whole subtree. Returns the new entry.



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/rubino/tools/shell_registry.rb', line 61

def spawn(command:, cwd:)
  rd, wr = IO.pipe
  # Writable stdin pipe: the agent feeds answers to interactive prompts
  # (Y/N, "select region", apt-style) via the `shell_input` tool, which
  # writes to `in_wr`. Line-oriented `read`/prompt commands consume this
  # fine; full-screen TTY programs (vim, REPLs that require [ -t 0 ]) are
  # out of scope for a plain pipe.
  in_rd, in_wr = IO.pipe
  # pgroup: true → child becomes leader of a new process group whose
  # pgid == child pid. Lets shell_kill send SIGTERM to the whole tree.
  # bash -o pipefail keeps this path consistent with the foreground
  # shell: a mid-pipeline crash surfaces as the exit status (#156).
  pid = Process.spawn("bash", "-o", "pipefail", "-c", command,
                      chdir: cwd, pgroup: true, in: in_rd, out: wr, err: wr)
  wr.close
  in_rd.close

  entry = Entry.new(
    id: new_id,
    command: command,
    cwd: cwd,
    pid: pid,
    pgid: pid,
    wait_thr: Process.detach(pid),
    buffer: +"",
    mutex: Mutex.new,
    started_at: Time.now,
    read_offset: 0,
    stdin: in_wr
  )
  entry.reader_thr = Thread.new { drain_into(entry, rd) }

  @mutex.synchronize { @entries[entry.id] = entry }
  entry
end

#status(entry) ⇒ Object



152
153
154
155
156
157
# File 'lib/rubino/tools/shell_registry.rb', line 152

def status(entry)
  return :running if entry.wait_thr.alive?

  code = entry.wait_thr.value.exitstatus
  code && ShellTool.success_exit?(code) ? :completed : :failed
end

#unregister_pgid(pgid) ⇒ Object

Drop a foreground shell process group once its own thread has reaped it.



55
56
57
# File 'lib/rubino/tools/shell_registry.rb', line 55

def unregister_pgid(pgid)
  @mutex.synchronize { @fg_pgids.delete(pgid) }
end

#write_input(entry, text, enter: true) ⇒ Object

Writes ‘text` to the background process’s stdin (with a trailing newline unless ‘enter: false`) — the “press Enter to answer a prompt” path. Returns the number of bytes written, or raises if stdin is gone.

Raises:

  • (IOError)


120
121
122
123
124
125
126
127
128
# File 'lib/rubino/tools/shell_registry.rb', line 120

def write_input(entry, text, enter: true)
  io = entry.stdin
  raise IOError, "stdin already closed" if io.nil? || io.closed?

  payload = enter ? "#{text}\n" : text.to_s
  io.write(payload)
  io.flush
  payload.bytesize
end