Class: Rubino::Tools::ShellRegistry

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/tools/shell_registry.rb

Overview

Process-wide registry for shell commands started with ‘run_in_background`. Each entry owns a pgid (process group), a reader thread that drains stdout+stderr into an in-memory ring buffer, and the wait_thr for exit.

The registry survives a single CLI/server process — it is intentionally NOT persisted to disk. Background shells die with the agent process.

Defined Under Namespace

Classes: Entry

Constant Summary collapse

RING_BYTES =

cap per run; older bytes are dropped

256 * 1024

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeShellRegistry

Returns a new instance of ShellRegistry.



29
30
31
32
# File 'lib/rubino/tools/shell_registry.rb', line 29

def initialize
  @entries = {}
  @mutex   = Mutex.new
end

Class Method Details

.instanceObject



24
25
26
# File 'lib/rubino/tools/shell_registry.rb', line 24

def instance
  @instance ||= new
end

Instance Method Details

#close_stdin(entry) ⇒ Object

Closes the write end of the child’s stdin (sends EOF). Idempotent.



96
97
98
99
100
101
# File 'lib/rubino/tools/shell_registry.rb', line 96

def close_stdin(entry)
  io = entry&.stdin
  io.close if io && !io.closed?
rescue IOError
  # already closed
end

#exit_code(entry) ⇒ Object



124
125
126
127
128
# File 'lib/rubino/tools/shell_registry.rb', line 124

def exit_code(entry)
  return nil if entry.wait_thr.alive?

  entry.wait_thr.value.exitstatus
end

#find(id) ⇒ Object



72
73
74
# File 'lib/rubino/tools/shell_registry.rb', line 72

def find(id)
  @mutex.synchronize { @entries[id] }
end

#read_all(entry) ⇒ Object



113
114
115
# File 'lib/rubino/tools/shell_registry.rb', line 113

def read_all(entry)
  entry.mutex.synchronize { entry.buffer.dup }
end

#read_new(entry) ⇒ Object

Reads accumulated bytes since the last ‘read_new` call. Returns the full snapshot if `since` is nil. Thread-safe.



105
106
107
108
109
110
111
# File 'lib/rubino/tools/shell_registry.rb', line 105

def read_new(entry)
  entry.mutex.synchronize do
    snapshot = entry.buffer.byteslice(entry.read_offset..) || ""
    entry.read_offset = entry.buffer.bytesize
    snapshot
  end
end

#remove(id) ⇒ Object



76
77
78
79
80
# File 'lib/rubino/tools/shell_registry.rb', line 76

def remove(id)
  entry = @mutex.synchronize { @entries.delete(id) }
  close_stdin(entry) if entry
  entry
end

#spawn(command:, cwd:) ⇒ Object

Spawns ‘command` detached in its own process group so a single kill takes out the whole subtree. Returns the new entry.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/rubino/tools/shell_registry.rb', line 36

def spawn(command:, cwd:)
  rd, wr = IO.pipe
  # Writable stdin pipe: the agent feeds answers to interactive prompts
  # (Y/N, "select region", apt-style) via the `shell_input` tool, which
  # writes to `in_wr`. Line-oriented `read`/prompt commands consume this
  # fine; full-screen TTY programs (vim, REPLs that require [ -t 0 ]) are
  # out of scope for a plain pipe.
  in_rd, in_wr = IO.pipe
  # pgroup: true → child becomes leader of a new process group whose
  # pgid == child pid. Lets shell_kill send SIGTERM to the whole tree.
  # bash -o pipefail keeps this path consistent with the foreground
  # shell: a mid-pipeline crash surfaces as the exit status (#156).
  pid = Process.spawn("bash", "-o", "pipefail", "-c", command,
                      chdir: cwd, pgroup: true, in: in_rd, out: wr, err: wr)
  wr.close
  in_rd.close

  entry = Entry.new(
    id: new_id,
    command: command,
    cwd: cwd,
    pid: pid,
    pgid: pid,
    wait_thr: Process.detach(pid),
    buffer: +"",
    mutex: Mutex.new,
    started_at: Time.now,
    read_offset: 0,
    stdin: in_wr
  )
  entry.reader_thr = Thread.new { drain_into(entry, rd) }

  @mutex.synchronize { @entries[entry.id] = entry }
  entry
end

#status(entry) ⇒ Object



117
118
119
120
121
122
# File 'lib/rubino/tools/shell_registry.rb', line 117

def status(entry)
  return :running if entry.wait_thr.alive?

  code = entry.wait_thr.value.exitstatus
  code && ShellTool.success_exit?(code) ? :completed : :failed
end

#write_input(entry, text, enter: true) ⇒ Object

Writes ‘text` to the background process’s stdin (with a trailing newline unless ‘enter: false`) — the “press Enter to answer a prompt” path. Returns the number of bytes written, or raises if stdin is gone.

Raises:

  • (IOError)


85
86
87
88
89
90
91
92
93
# File 'lib/rubino/tools/shell_registry.rb', line 85

def write_input(entry, text, enter: true)
  io = entry.stdin
  raise IOError, "stdin already closed" if io.nil? || io.closed?

  payload = enter ? "#{text}\n" : text.to_s
  io.write(payload)
  io.flush
  payload.bytesize
end