Class: Rubino::Tools::BackgroundTasks
- Inherits:
-
Object
- Object
- Rubino::Tools::BackgroundTasks
- Defined in:
- lib/rubino/tools/background_tasks.rb
Overview
Process-wide registry for subagents started by the ‘task` tool in the BACKGROUND (the default). Mirrors ShellRegistry — the in-repo precedent for “fire-and-forget + poll later + kill” — but the unit of work is a nested Agent::Runner thread instead of a detached OS process.
Each entry owns:
- the worker Thread running the child Runner#run!,
- the child Runner (so #cancel can flip its CancelToken — exactly the
mechanism Run::Executor's stop-watcher uses for top-level runs),
- the terminal status/result/error captured in the worker's `ensure`.
The registry survives a single CLI/server process — like ShellRegistry it is intentionally NOT persisted. Background subagents die with the process.
Concurrency cap (mirrors the reference _DEFAULT_MAX_CONCURRENT_CHILDREN = 3): a background subagent is a full LLM run = real cost, so #spawn refuses past MAX_CONCURRENT live children rather than fanning out unbounded threads.
Defined Under Namespace
Classes: Entry
Constant Summary collapse
- MAX_CONCURRENT =
3- MAX_DEPTH =
Fallback caps for the nested-subagent tree, used when config is absent (e.g. a bare registry in a unit test with no Configuration wired). The live values come from config (tasks.max_depth / max_children_per_node / max_concurrent_total); these constants are the built-in defaults the config keys themselves default to. All three are enforced in #reserve.
2- MAX_CHILDREN_PER_NODE =
3- MAX_CONCURRENT_TOTAL =
8- ACTIVITY_LOG_MAX =
How many recent activity lines the drill-in shows (the live ‘recent:` ring).
6- OUTPUT_TAIL_MAX =
Bounds for the live output tail (#5): how many COMPLETE lines the drill-in’s output: block shows (the buffer keeps one extra slot for the in-flight partial line), and the byte cap per buffered line so a newline-free stream can’t grow a line unbounded.
6- OUTPUT_TAIL_LINE_MAX =
200- DENY_NOTE_PREFIX =
Prefix the human’s “deny & tell the agent why” reason carries when handed to the child as a steer note (#Y1B). The note is ADVISORY — the approval gate is already denied regardless — so when the child finishes before folding it in, the still-queued copy drained by #complete must NOT raise the scary “steer note not delivered (task completed first)” alarm: the denial applied correctly and the explanation is moot. The completion paths filter this prefix out of the undelivered WARNING (a calm note instead). A genuine ‘/agents <id> steer` note never carries it, so its deliver-or-report invariant is intact.
"[approval denied by human] "- LIVE_STATUSES =
The statuses under which a child still holds a concurrency slot: its worker thread is alive — actively running, parked on a human approval, or unwinding after a stop request. This is the SINGLE source of truth for “is this child still alive?”, shared by the registry itself (#running / #reserve cap) AND by every UI surface that lists live children (the footer cards, the attached switcher, the navigable picker) so they can never drift apart and silently drop a live-but-quiet child from one surface while another still shows it (R1). Any new parked state added to the lifecycle is made visible everywhere by editing this one set.
%i[running needs_approval stopping].freeze
Instance Attribute Summary collapse
-
#last_refusal_reason ⇒ Object
readonly
Why the most recent #reserve returned nil (one of :depth / :per_owner / :global), or nil when the last reserve succeeded.
Class Method Summary collapse
- .instance ⇒ Object
-
.live_status?(status) ⇒ Boolean
The shared liveness oracle (see LIVE_STATUSES).
-
.reset! ⇒ Object
Test seam: drop all state between examples.
Instance Method Summary collapse
-
#attach(entry, thread:, runner:) ⇒ Object
Binds the live worker thread + child runner to a reserved entry so the registry can later cancel it.
-
#awaiting_approval ⇒ Object
Entries currently parked on a human approval — surfaced on their card and answerable via /agents <id>.
-
#begin_approval(id, gate:, approval_id:, question:, command:, budget: false) ⇒ Object
Flips an entry into the :needs_approval state and stores the gate + question/command the card surfaces (Option 2).
-
#cancel_all ⇒ Object
Structured-concurrency teardown seam: cancel EVERY live subagent so the process never leaves a child parked.
-
#complete(entry, status:, result: nil, error: nil) ⇒ Object
Records terminal state when the worker finishes (called from its ‘ensure`).
-
#end_approval(id) ⇒ Object
Clears the approval state and returns the entry to :running once a decision has been delivered (or the child unwinds).
- #find(id) ⇒ Object
-
#initialize ⇒ BackgroundTasks
constructor
A new instance of BackgroundTasks.
-
#list ⇒ Object
All entries, newest first — for a ‘task` listing (the /tasks analogue).
-
#owned_by?(parent_id, child_id) ⇒ Boolean
True iff ‘child_id`’s direct owner is ‘parent_id` (the ownership predicate steer/probe AUTHORIZATION checks build on).
-
#queued_approval_count ⇒ Object
How many children are parked on an approval BEHIND the head — i.e.
-
#record_live_probe(id) ⇒ Object
Records a BILLED live probe against a child (S3): bumps probe_count under the mutex (the owner runs this on its own thread while the parent renderer may read the entry).
-
#record_tool_finished(id, line) ⇒ Object
Records a child tool FINISHING: appends a terse line to the bounded activity ring the live drill-in (#71) tails.
-
#record_tool_output(id, chunk) ⇒ Object
Records a streamed chunk of the CURRENTLY RUNNING tool’s output (#5): splits on newlines into a bounded line buffer whose LAST slot carries the in-flight partial line, so the /agents drill-in can tail it live.
-
#record_tool_started(id, activity) ⇒ Object
Records a child tool STARTING: bumps the tool counter and sets the last-activity string the card/list show so concurrent tasks stay distinguishable (#124/#127).
- #remove(id) ⇒ Object
-
#request_stop(id) ⇒ Object
Marks a stop REQUEST (the /agents <id> –stop / task_stop path) on a live entry so the list/cards immediately show ◌ stopping instead of a stale ● running while the child unwinds at its next checkpoint (#108).
-
#reserve(subagent:, prompt:, owner_subagent_id: nil, depth: 0) ⇒ Object
Reserves a slot and registers a ‘running` entry, returning it.
-
#running ⇒ Object
Live (still-running) children — used by the parent stop path to cancel orphans, and to enforce the concurrency cap.
-
#shutdown!(grace: 1.0) ⇒ Object
Process-exit teardown: first do the cooperative cancel above, then give child threads a short chance to finish and finally kill non-cooperative survivors.
-
#steer(id, text) ⇒ Object
Records a parent->child steer note (the ‘/agents <id> steer "…"` affordance).
-
#stop_entry(entry) ⇒ Object
The ONE per-entry stop body, shared by every stop path (the human /agents <id> –stop, the model-callable task_stop, and the parent-teardown #cancel_all below).
Constructor Details
#initialize ⇒ BackgroundTasks
Returns a new instance of BackgroundTasks.
154 155 156 157 158 159 160 161 162 |
# File 'lib/rubino/tools/background_tasks.rb', line 154 def initialize @entries = {} @mutex = Mutex.new # Monotonic source for approval_seq — the FIFO order of the approval # modal queue. Bumped under @mutex on every begin_approval so two # children that park "at once" still get a deterministic, stable order # (the one whose begin_approval won the lock first is the head). @approval_seq = 0 end |
Instance Attribute Details
#last_refusal_reason ⇒ Object (readonly)
Why the most recent #reserve returned nil (one of :depth / :per_owner / :global), or nil when the last reserve succeeded. Read by TaskTool to phrase a reason-specific at-capacity message.
211 212 213 |
# File 'lib/rubino/tools/background_tasks.rb', line 211 def last_refusal_reason @last_refusal_reason end |
Class Method Details
.instance ⇒ Object
136 137 138 |
# File 'lib/rubino/tools/background_tasks.rb', line 136 def instance @instance ||= new end |
.live_status?(status) ⇒ Boolean
The shared liveness oracle (see LIVE_STATUSES). Public so the UI surfaces that format a registry snapshot (SubagentCards, AgentMenu) filter by the EXACT same rule the registry uses, with no duplicated status list to fall out of sync.
149 150 151 |
# File 'lib/rubino/tools/background_tasks.rb', line 149 def live_status?(status) LIVE_STATUSES.include?(status) end |
.reset! ⇒ Object
Test seam: drop all state between examples.
141 142 143 |
# File 'lib/rubino/tools/background_tasks.rb', line 141 def reset! @instance = nil end |
Instance Method Details
#attach(entry, thread:, runner:) ⇒ Object
Binds the live worker thread + child runner to a reserved entry so the registry can later cancel it. Done after reserve so the entry exists in the map before the thread starts (no race on completion writing back).
216 217 218 219 220 221 |
# File 'lib/rubino/tools/background_tasks.rb', line 216 def attach(entry, thread:, runner:) @mutex.synchronize do entry.thread = thread entry.runner = runner end end |
#awaiting_approval ⇒ Object
Entries currently parked on a human approval — surfaced on their card and answerable via /agents <id>. Ordered OLDEST-FIRST (by the moment the child blocked, approval_seq) so the modal queue is FIFO: when two children raise an approval at once only ONE modal is presented at a time (the head of this list — auto_resolve_pending takes #first), the rest are the “(N more queued)” backlog shown on the active modal, and they dequeue in the order they parked. Ties fall back to started_at for a stable order.
409 410 411 412 413 414 |
# File 'lib/rubino/tools/background_tasks.rb', line 409 def awaiting_approval @mutex.synchronize do @entries.values.select { |e| e.status == :needs_approval } .sort_by { |e| [e.approval_seq.to_i, e.started_at] } end end |
#begin_approval(id, gate:, approval_id:, question:, command:, budget: false) ⇒ Object
Flips an entry into the :needs_approval state and stores the gate + question/command the card surfaces (Option 2). The child thread then parks on ‘gate.await(approval_id)`; the user resolves it via /agents <id>. Returns the previous status so the child can restore it.
327 328 329 330 331 332 333 334 335 336 337 338 339 340 |
# File 'lib/rubino/tools/background_tasks.rb', line 327 def begin_approval(id, gate:, approval_id:, question:, command:, budget: false) @mutex.synchronize do entry = @entries[id] return unless entry entry.approval_gate = gate entry.approval_id = approval_id entry.approval_question = question.to_s entry.approval_command = command.to_s entry.budget_request = budget ? true : false entry.approval_seq = (@approval_seq += 1) entry.status = :needs_approval end end |
#cancel_all ⇒ Object
Structured-concurrency teardown seam: cancel EVERY live subagent so the process never leaves a child parked. The required fix for the parent-death deadlock (#XXX) — when the PARENT dies/interrupts (REPL break, HUP/TERM, clean quit, an aborted turn) a child parked on its approval gate otherwise stays parked for the full approval timeout because nothing cancels its gate; the per-id stop paths only fire on an explicit /agents –stop or task_stop. Calling this from each parent-death edge wakes every blocked child SYNCHRONOUSLY (cancel! pushes its sentinel; the gate’s await observes it within one WAKE_TICK) so each unwinds via the existing ‘rescue Rubino::Interrupted` with the clean “parent question was cancelled” message instead of hanging to the bound. No-op when there are no live children, and idempotent (#stop_entry is), so it is safe to invoke from a teardown `ensure` and from a signal trap. Snapshots #running first (outside the per-entry work) so we don’t hold the registry mutex across the gate/runner cancels.
479 480 481 482 483 484 485 486 487 488 489 490 |
# File 'lib/rubino/tools/background_tasks.rb', line 479 def cancel_all live = running live.each { |entry| stop_entry(entry) } # Logical cancel alone (above) only flips cancel tokens and trusts each # child THREAD to observe the token and reap its own shell within a wake # tick — but on parent-DEATH the process exits before the thread reaches # that checkpoint, so any shell a child spawned (its own pgid) reparents # to init as an orphan (MED-2). Reap the tracked shell process groups # SYNCHRONOUSLY here so the same parent-death edges that call cancel_all # (clean quit, HUP/TERM trap, REPL break) leave no surviving shell. ShellRegistry.instance.kill_all_groups end |
#complete(entry, status:, result: nil, error: nil) ⇒ Object
Records terminal state when the worker finishes (called from its ‘ensure`). Single writer per entry, but guarded so #find/#list readers see a consistent snapshot. A failure landing on a :stopping entry is a USER-REQUESTED stop unwinding (Interrupted at the next checkpoint), so it is recorded as :stopped — distinct from a genuine :failed (#108/#13).
H5 — closes the drain↔complete race. The final drain of the child’s steer_queue happens HERE, under the SAME registry mutex that flips the status to terminal, and #steer refuses to push onto a terminal entry under that SAME mutex. So a steer/answer arriving concurrently is serialised against this finalize: it is EITHER pushed before the status flips (and drained right here into the returned ‘undelivered` notes) OR rejected by #steer (which then honestly reports not-delivered). The earlier shape — drain (InputQueue lock) then complete (registry lock), two locks with a gap — let an answer land on a now-dead queue: dropped, omitted from `undelivered`, yet reported delivered. Returns the notes that were still queued at finalize time (never delivered to the child), so the caller can surface them as undelivered.
257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/rubino/tools/background_tasks.rb', line 257 def complete(entry, status:, result: nil, error: nil) @mutex.synchronize do status = :stopped if entry.status == :stopping && status == :failed entry.status = status entry.result = result entry.error = error entry.finished_at = Time.now # Drain UNDER the mutex: anything still here is undelivered (the child # has no further turn to fold it in), and once status is terminal no # new note can arrive — #steer rejects it. entry.steer_queue&.drain || [] end end |
#end_approval(id) ⇒ Object
Clears the approval state and returns the entry to :running once a decision has been delivered (or the child unwinds).
344 345 346 347 348 349 350 351 352 353 354 355 356 357 |
# File 'lib/rubino/tools/background_tasks.rb', line 344 def end_approval(id) @mutex.synchronize do entry = @entries[id] return unless entry entry.approval_gate = nil entry.approval_id = nil entry.approval_question = nil entry.approval_command = nil entry.budget_request = false entry.approval_seq = nil entry.status = :running if entry.status == :needs_approval end end |
#find(id) ⇒ Object
426 427 428 |
# File 'lib/rubino/tools/background_tasks.rb', line 426 def find(id) @mutex.synchronize { @entries[id] } end |
#list ⇒ Object
All entries, newest first — for a ‘task` listing (the /tasks analogue).
431 432 433 |
# File 'lib/rubino/tools/background_tasks.rb', line 431 def list @mutex.synchronize { @entries.values.sort_by(&:started_at).reverse } end |
#owned_by?(parent_id, child_id) ⇒ Boolean
True iff ‘child_id`’s direct owner is ‘parent_id` (the ownership predicate steer/probe AUTHORIZATION checks build on).
507 508 509 510 511 512 |
# File 'lib/rubino/tools/background_tasks.rb', line 507 def owned_by?(parent_id, child_id) @mutex.synchronize do child = @entries[child_id] !child.nil? && child.owner_subagent_id == parent_id end end |
#queued_approval_count ⇒ Object
How many children are parked on an approval BEHIND the head — i.e. the backlog the active modal advertises as “(N more queued)”. Only ONE approval modal is presented at a time (awaiting_approval.first); this is everyone else still :needs_approval. Zero when at most one child is parked. The active modal reads this so the user knows more are waiting and that resolving the current one dequeues the next.
422 423 424 |
# File 'lib/rubino/tools/background_tasks.rb', line 422 def queued_approval_count [awaiting_approval.size - 1, 0].max end |
#record_live_probe(id) ⇒ Object
Records a BILLED live probe against a child (S3): bumps probe_count under the mutex (the owner runs this on its own thread while the parent renderer may read the entry). Returns the new count, or nil for an unknown id. Free snapshot probes (live:false) never call this — only ‘probe(live:true)` does, after the budget check passes.
392 393 394 395 396 397 398 399 400 |
# File 'lib/rubino/tools/background_tasks.rb', line 392 def record_live_probe(id) @mutex.synchronize do entry = @entries[id] return nil unless entry entry.probe_count = entry.probe_count.to_i + 1 entry.probe_count end end |
#record_tool_finished(id, line) ⇒ Object
Records a child tool FINISHING: appends a terse line to the bounded activity ring the live drill-in (#71) tails. Keeps the last ACTIVITY_LOG_MAX entries so the ring never grows unbounded for a read-heavy child. Also wipes the live output tail — it belongs to the tool that just finished, so the drill-in’s output: block clears (#5).
292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/rubino/tools/background_tasks.rb', line 292 def record_tool_finished(id, line) @mutex.synchronize do entry = @entries[id] return unless entry log = (entry.activity_log ||= []) log << line.to_s log.shift while log.size > ACTIVITY_LOG_MAX entry.output_tail = nil end end |
#record_tool_output(id, chunk) ⇒ Object
Records a streamed chunk of the CURRENTLY RUNNING tool’s output (#5): splits on newlines into a bounded line buffer whose LAST slot carries the in-flight partial line, so the /agents drill-in can tail it live. Called from a subagent’s UI::CLI#tool_chunk on the CHILD thread, so it MUST take the mutex like the other record_* writers. No-op for an unknown id.
309 310 311 312 313 314 315 316 317 318 319 320 321 |
# File 'lib/rubino/tools/background_tasks.rb', line 309 def record_tool_output(id, chunk) @mutex.synchronize do entry = @entries[id] return unless entry tail = (entry.output_tail ||= [""]) chunk.to_s.each_line do |line| tail[-1] = "#{tail[-1]}#{line.chomp}"[0, OUTPUT_TAIL_LINE_MAX] tail << "" if line.end_with?("\n") end tail.shift while tail.size > OUTPUT_TAIL_MAX + 1 end end |
#record_tool_started(id, activity) ⇒ Object
Records a child tool STARTING: bumps the tool counter and sets the last-activity string the card/list show so concurrent tasks stay distinguishable (#124/#127). Called from a subagent’s UI::CLI#tool_started, which runs on the CHILD thread, so it MUST take the mutex (the parent renderer reads these fields concurrently). No-op for an unknown id (a late event after #remove).
277 278 279 280 281 282 283 284 285 |
# File 'lib/rubino/tools/background_tasks.rb', line 277 def record_tool_started(id, activity) @mutex.synchronize do entry = @entries[id] return unless entry entry.tool_count = entry.tool_count.to_i + 1 entry.last_activity = activity.to_s end end |
#remove(id) ⇒ Object
443 444 445 |
# File 'lib/rubino/tools/background_tasks.rb', line 443 def remove(id) @mutex.synchronize { @entries.delete(id) } end |
#request_stop(id) ⇒ Object
Marks a stop REQUEST (the /agents <id> –stop / task_stop path) on a live entry so the list/cards immediately show ◌ stopping instead of a stale ● running while the child unwinds at its next checkpoint (#108). Returns true when the entry flipped. #complete then maps a failure on a :stopping entry to the terminal :stopped, so a deliberate stop never reads as ✗ failed (#13).
229 230 231 232 233 234 235 236 237 |
# File 'lib/rubino/tools/background_tasks.rb', line 229 def request_stop(id) @mutex.synchronize do entry = @entries[id] return false unless entry && live_status?(entry.status) entry.status = :stopping true end end |
#reserve(subagent:, prompt:, owner_subagent_id: nil, depth: 0) ⇒ Object
Reserves a slot and registers a ‘running` entry, returning it. The caller then attaches the worker thread + runner via #attach.
owner_subagent_id is the ‘sa_*` id of the SPAWNING subagent (nil ⇒ the human / top-level agent spawned this child). depth is the caller’s hint for a human-spawned child (0); for an owner-spawned child the depth is recomputed here from the owner entry (owner.depth + 1) so a stale hint can’t smuggle a child past the depth cap.
Returns nil — so TaskTool can surface a clear message instead of spawning unbounded work — when ANY of the three nesting caps is hit. The reason is available via #last_refusal_reason for the caller to phrase the message:
:depth — depth >= max_depth (no deeper nesting allowed)
:per_owner — this owner already has max_children_per_node live kids
:global — total live subagents across the tree >= max total
This is the SINGLE enforcement point for every nesting limit.
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/rubino/tools/background_tasks.rb', line 180 def reserve(subagent:, prompt:, owner_subagent_id: nil, depth: 0) @mutex.synchronize do owner = owner_subagent_id ? @entries[owner_subagent_id] : nil effective_depth = owner ? owner.depth.to_i + 1 : depth.to_i @last_refusal_reason = refusal_reason(owner_subagent_id, effective_depth) return nil if @last_refusal_reason entry = Entry.new( id: new_id, subagent: subagent.to_s, prompt: prompt.to_s, status: :running, started_at: Time.now, tool_count: 0, activity_log: [], # Every background child gets its OWN steering queue at reserve time # so the parent can `/agents <id> steer "..."` it the instant it is # listed — no separate wiring step, no nil window. steer_queue: Interaction::InputQueue.new, owner_subagent_id: owner_subagent_id, depth: effective_depth ) @entries[entry.id] = entry entry end end |
#running ⇒ Object
Live (still-running) children — used by the parent stop path to cancel orphans, and to enforce the concurrency cap. A child parked on a human approval (:needs_approval) is STILL live (its thread is alive, holding a slot), so it counts as running here.
439 440 441 |
# File 'lib/rubino/tools/background_tasks.rb', line 439 def running @mutex.synchronize { @entries.values.select { |e| live_status?(e.status) } } end |
#shutdown!(grace: 1.0) ⇒ Object
Process-exit teardown: first do the cooperative cancel above, then give child threads a short chance to finish and finally kill non-cooperative survivors. Background subagents are Ruby threads, not OS child processes; if a child is stuck in a provider read that never observes its cancel token, a plain #cancel_all leaves the process alive waiting on that non-daemon thread. This method is for chat shutdown only, not normal per-task stops.
499 500 501 502 503 |
# File 'lib/rubino/tools/background_tasks.rb', line 499 def shutdown!(grace: 1.0) live = running cancel_all join_or_kill_threads(live, grace: grace) end |
#steer(id, text) ⇒ Object
Records a parent->child steer note (the ‘/agents <id> steer "…"` affordance). Pushes the text onto the child’s steering queue, which the child Loop drains at its next iteration boundary (Loop#inject_steered_input) — between turns, never between a tool_use and its results. Best-effort: returns false (and pushes nothing) when the entry is gone, has no queue, or has ALREADY reached a terminal state (the child finished — there is no more turn to fold the note into); true when the note was queued.
H5 — the push happens UNDER the registry mutex, gated on a non-terminal status, so it is serialised against #complete (which flips the status to terminal AND drains the queue under that SAME mutex). Either this push wins the lock first (the note is queued and will be drained — by the child at its next turn, or by #complete into the undelivered report) or #complete wins first (status is terminal and this returns false). There is no window in which a note is pushed onto a queue nobody will drain yet reported delivered. Pushing inside the mutex is safe: InputQueue#push has its own lock and never calls back into the registry, so no lock cycle.
376 377 378 379 380 381 382 383 384 385 |
# File 'lib/rubino/tools/background_tasks.rb', line 376 def steer(id, text) @mutex.synchronize do entry = @entries[id] return false unless entry&.steer_queue return false if terminal_status?(entry.status) entry.steer_queue.push(text) true end end |
#stop_entry(entry) ⇒ Object
The ONE per-entry stop body, shared by every stop path (the human /agents <id> –stop, the model-callable task_stop, and the parent-teardown #cancel_all below). Marks the stop so the unwind records as :stopped (not ✗ failed) and the list shows ◌ stopping, then wakes the entry no matter HOW it is blocked: a child parked on its approval gate (cancel it → Interrupted → clean unwind) and the runner’s CancelToken for a child between checkpoints. Idempotent and safe on an already-stopped or never-blocked entry (each cancel! is one-shot; request_stop no-ops on a non-live status), so #cancel_all can call it across the whole registry.
456 457 458 459 460 461 462 |
# File 'lib/rubino/tools/background_tasks.rb', line 456 def stop_entry(entry) return unless entry request_stop(entry.id) entry.approval_gate&.cancel! entry.runner&.cancel! end |