Class: Rubino::Tools::ShellRegistry

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/tools/shell_registry.rb

Overview

Process-wide registry for shell commands started with ‘run_in_background`. Each entry owns a pgid (process group), a reader thread that drains stdout+stderr into an in-memory ring buffer, and the wait_thr for exit.

The registry survives a single CLI/server process — it is intentionally NOT persisted to disk. Background shells die with the agent process.

Defined Under Namespace

Classes: Entry

Constant Summary collapse

RING_BYTES =

cap per run; older bytes are dropped

256 * 1024
RETIRED_TTL =

A backgrounded command that FINISHES before the next turn used to be dropped from the registry the moment a reader (shell_output/tail/kill) saw it non-running — which also collapsed ‘any?` to false, so the shell-management tools vanished from the schema next turn and the model could never fetch a short bg command’s captured output (#78). Instead a finished entry is RETIRED: it stays in the registry (its buffer + exit status intact, retrievable by shell_output) and ‘any?` keeps the tools exposed, until it is read again OR these bounds reap it. RETIRED_TTL caps how long a finished-but-unread entry lingers; MAX_RETIRED caps how many we keep at once (oldest-retired evicted first) so the registry stays bounded across a long session.

300
MAX_RETIRED =

seconds a finished, unread bg shell stays retrievable

16

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeShellRegistry

Returns a new instance of ShellRegistry.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/rubino/tools/shell_registry.rb', line 56

def initialize
  @entries = {}
  @mutex   = Mutex.new
  # Live FOREGROUND shell process groups, keyed by pgid. A foreground
  # shell's pgid otherwise lives only in the ShellTool#execute_foreground
  # stack frame of the (sub)agent thread that started it — so on
  # parent-death there is nothing process-wide to reap it and it
  # reparents to init as an orphan (MED-2). Tracking it here lets
  # #kill_all_groups SIGTERM/SIGKILL it synchronously on teardown.
  @fg_pgids = {}
  # Lock-free, atomically-swapped snapshot of every live shell pgid
  # (background entries + tracked foreground pgids). The SIGTERM/SIGHUP
  # teardown trap (#478) reaps the child groups, but it CANNOT take the
  # mutex above — Ruby forbids Mutex#synchronize from a trap context
  # (ThreadError). The writers always rebuild this frozen Array UNDER the
  # mutex; the trap reads it with a single, lock-free ivar read (an atomic
  # reference load in MRI) and never iterates a structure another thread
  # is mutating. See #kill_all_groups_trap_safe.
  @pgid_snapshot = [].freeze
end

Class Method Details

.instanceObject



44
45
46
# File 'lib/rubino/tools/shell_registry.rb', line 44

def instance
  @instance ||= new
end

.reset!Object

Test seam: drop the process-wide registry between examples so the situational shell-tool gate (#313) starts each spec with no background shell. Mirrors BackgroundTasks.reset!.



51
52
53
# File 'lib/rubino/tools/shell_registry.rb', line 51

def reset!
  @instance = nil
end

Instance Method Details

#any?Boolean

True when at least one background shell is RUNNING or has finished but is still retained (retired, unread, within TTL — see #retire). The session-stable signal #313 gates the shell-management tools on this: a normal turn with no background shell never ships shell_input/shell_output/shell_tail/shell_kill, but a SHORT bg command that finished before the next turn keeps shell_output exposed so the model can still fetch its captured output (#78). Prunes stale retired entries first so the gate closes once nothing is reachable.

Returns:

  • (Boolean)


164
165
166
167
168
169
# File 'lib/rubino/tools/shell_registry.rb', line 164

def any?
  @mutex.synchronize do
    prune_retired
    !@entries.empty?
  end
end

#close_stdin(entry) ⇒ Object

Closes the write end of the child’s stdin (sends EOF). Idempotent.



219
220
221
222
223
224
# File 'lib/rubino/tools/shell_registry.rb', line 219

def close_stdin(entry)
  io = entry&.stdin
  io.close if io && !io.closed?
rescue IOError
  # already closed
end

#exit_code(entry) ⇒ Object



247
248
249
250
251
# File 'lib/rubino/tools/shell_registry.rb', line 247

def exit_code(entry)
  return nil if entry.wait_thr.alive?

  entry.wait_thr.value.exitstatus
end

#find(id) ⇒ Object



152
153
154
# File 'lib/rubino/tools/shell_registry.rb', line 152

def find(id)
  @mutex.synchronize { @entries[id] }
end

#kill_all_groups(grace: 0.5) ⇒ Object

Synchronous teardown reaper (MED-2): SIGTERM every live shell process group this session owns — the background ENTRIES and the tracked FOREGROUND pgids — give them a brief grace, then SIGKILL any straggler. Mirrors the Python Hermes ‘_kill_process` (os.killpg SIGTERM → wait →SIGKILL). Called from BackgroundTasks#cancel_all so EVERY parent-death edge (clean quit `ensure`, HUP/TERM trap, REPL break) reaps the child shells the cooperative cancel token alone can’t reach before the process exits and the shells reparent to init. Returns the pgids it signalled.

TRAP-SAFE (#478): reads the lock-free @pgid_snapshot — never Mutex#synchronize, which Ruby forbids from a signal-trap context (ThreadError). So the SIGTERM/SIGHUP teardown trap can call this directly. Process.kill and sleep are both async-signal-safe.



266
267
268
269
270
271
272
273
274
# File 'lib/rubino/tools/shell_registry.rb', line 266

def kill_all_groups(grace: 0.5)
  pgids = @pgid_snapshot
  return pgids if pgids.empty?

  pgids.each { |pgid| signal_group("TERM", pgid) }
  sleep(grace) if grace.positive?
  pgids.each { |pgid| signal_group("KILL", pgid) }
  pgids
end

#read_all(entry) ⇒ Object



236
237
238
# File 'lib/rubino/tools/shell_registry.rb', line 236

def read_all(entry)
  entry.mutex.synchronize { entry.buffer.dup }
end

#read_new(entry) ⇒ Object

Reads accumulated bytes since the last ‘read_new` call. Returns the full snapshot if `since` is nil. Thread-safe.



228
229
230
231
232
233
234
# File 'lib/rubino/tools/shell_registry.rb', line 228

def read_new(entry)
  entry.mutex.synchronize do
    snapshot = entry.buffer.byteslice(entry.read_offset..) || ""
    entry.read_offset = entry.buffer.bytesize
    snapshot
  end
end

#register_pgid(pgid) ⇒ Object

Track a live foreground shell process group so teardown can reap it.



78
79
80
81
82
83
84
# File 'lib/rubino/tools/shell_registry.rb', line 78

def register_pgid(pgid)
  @mutex.synchronize do
    @fg_pgids[pgid] = true
    refresh_pgid_snapshot
  end
  pgid
end

#remove(id) ⇒ Object



171
172
173
174
175
176
177
178
179
# File 'lib/rubino/tools/shell_registry.rb', line 171

def remove(id)
  entry = @mutex.synchronize do
    e = @entries.delete(id)
    refresh_pgid_snapshot
    e
  end
  close_stdin(entry) if entry
  entry
end

#retire(id) ⇒ Object

Retires a FINISHED background shell instead of dropping it (#78): the entry stays in the registry — its captured output + exit status intact and retrievable by a later shell_output — and ‘any?` keeps the shell-management tools exposed, so a short bg command’s output is still reachable on the next turn. The process is already dead, so its pgid is cleared from the teardown snapshot and its stdin closed. Bounded by RETIRED_TTL / MAX_RETIRED (pruned here and in #any?). Stamps retired_at on the first retire and is idempotent — a second read of a retired entry keeps the original timestamp so a re-read can’t extend its lifetime indefinitely. No-op for an unknown or still-running id.



191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/rubino/tools/shell_registry.rb', line 191

def retire(id)
  @mutex.synchronize do
    entry = @entries[id]
    return nil unless entry
    return entry if entry.retired_at # already retired — keep original TTL clock

    entry.retired_at = Time.now
    close_stdin(entry)    # process is gone; release its stdin pipe
    refresh_pgid_snapshot # a retired (dead) shell drops out of the teardown set
    prune_retired
    entry
  end
end

#spawn(command:, cwd:) ⇒ Object

Spawns ‘command` detached in its own process group so a single kill takes out the whole subtree. Returns the new entry.



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/rubino/tools/shell_registry.rb', line 96

def spawn(command:, cwd:)
  # Capture the parent's notification sink on the CALLING thread (the turn
  # thread). The reader thread below can't read Rubino.background_sink —
  # thread-locals don't propagate — so a finished bg shell would notify
  # nothing (US-5 lost-completion). Stash it like a subagent does.
  sink = Rubino.background_sink
  rd, wr = IO.pipe
  # Writable stdin pipe: the agent feeds answers to interactive prompts
  # (Y/N, "select region", apt-style) via the `shell_input` tool, which
  # writes to `in_wr`. Line-oriented `read`/prompt commands consume this
  # fine; full-screen TTY programs (vim, REPLs that require [ -t 0 ]) are
  # out of scope for a plain pipe.
  in_rd, in_wr = IO.pipe
  # pgroup: true → child becomes leader of a new process group whose
  # pgid == child pid. Lets shell_kill send SIGTERM to the whole tree.
  # bash -o pipefail keeps this path consistent with the foreground
  # shell: a mid-pipeline crash surfaces as the exit status (#156).
  #
  # OS write-jail (#290/#544, slice 2): a backgrounded command went
  # UNJAILED before this — a real hole, since `run_in_background: true`
  # let a write outside the workspace through that the foreground path
  # blocks. We now build the spawn argv+env through the SAME
  # ShellTool.sandboxed_bash_argv helper the foreground uses, so the
  # platform sandbox launcher prefixes bash and the writable-roots env is
  # merged identically. The launcher `exec`s into bash in-place, so the
  # pgroup/pipes/cwd/tracking below are all preserved. Empty prefix when
  # the sandbox is off/unavailable ⇒ byte-identical to before.
  pid = Process.spawn(*ShellTool.sandboxed_bash_argv(command, cwd: cwd),
                      chdir: cwd, pgroup: true, in: in_rd, out: wr, err: wr)
  wr.close
  in_rd.close

  entry = Entry.new(
    id: new_id,
    command: command,
    cwd: cwd,
    pid: pid,
    pgid: pid,
    wait_thr: Process.detach(pid),
    buffer: +"",
    mutex: Mutex.new,
    started_at: Time.now,
    read_offset: 0,
    stdin: in_wr,
    sink: sink,
    notified: false
  )
  entry.reader_thr = Thread.new { drain_into(entry, rd) }

  @mutex.synchronize do
    @entries[entry.id] = entry
    refresh_pgid_snapshot
  end
  entry
end

#status(entry) ⇒ Object



240
241
242
243
244
245
# File 'lib/rubino/tools/shell_registry.rb', line 240

def status(entry)
  return :running if entry.wait_thr.alive?

  code = entry.wait_thr.value.exitstatus
  code && ShellTool.success_exit?(code) ? :completed : :failed
end

#unregister_pgid(pgid) ⇒ Object

Drop a foreground shell process group once its own thread has reaped it.



87
88
89
90
91
92
# File 'lib/rubino/tools/shell_registry.rb', line 87

def unregister_pgid(pgid)
  @mutex.synchronize do
    @fg_pgids.delete(pgid)
    refresh_pgid_snapshot
  end
end

#write_input(entry, text, enter: true) ⇒ Object

Writes ‘text` to the background process’s stdin (with a trailing newline unless ‘enter: false`) — the “press Enter to answer a prompt” path. Returns the number of bytes written, or raises if stdin is gone.

Raises:

  • (IOError)


208
209
210
211
212
213
214
215
216
# File 'lib/rubino/tools/shell_registry.rb', line 208

def write_input(entry, text, enter: true)
  io = entry.stdin
  raise IOError, "stdin already closed" if io.nil? || io.closed?

  payload = enter ? "#{text}\n" : text.to_s
  io.write(payload)
  io.flush
  payload.bytesize
end