Class: Rubino::Tools::BackgroundTasks
- Inherits:
-
Object
- Object
- Rubino::Tools::BackgroundTasks
- Defined in:
- lib/rubino/tools/background_tasks.rb
Overview
Process-wide registry for subagents started by the ‘task` tool in the BACKGROUND (the default). Mirrors ShellRegistry — the in-repo precedent for “fire-and-forget + poll later + kill” — but the unit of work is a nested Agent::Runner thread instead of a detached OS process.
Each entry owns:
- the worker Thread running the child Runner#run!,
- the child Runner (so #cancel can flip its CancelToken — exactly the
mechanism Run::Executor's stop-watcher uses for top-level runs),
- the terminal status/result/error captured in the worker's `ensure`.
The registry survives a single CLI/server process — like ShellRegistry it is intentionally NOT persisted. Background subagents die with the process.
Concurrency cap (mirrors the reference _DEFAULT_MAX_CONCURRENT_CHILDREN = 3): a background subagent is a full LLM run = real cost, so #spawn refuses past MAX_CONCURRENT live children rather than fanning out unbounded threads.
Defined Under Namespace
Classes: Entry
Constant Summary collapse
- MAX_CONCURRENT =
3- MAX_DEPTH =
Fallback caps for the nested-subagent tree, used when config is absent (e.g. a bare registry in a unit test with no Configuration wired). The live values come from config (tasks.max_depth / max_children_per_node / max_concurrent_total); these constants are the built-in defaults the config keys themselves default to. All three are enforced in #reserve.
2- MAX_CHILDREN_PER_NODE =
3- MAX_CONCURRENT_TOTAL =
8- ACTIVITY_LOG_MAX =
How many recent activity lines the drill-in shows (the live ‘recent:` ring).
6- OUTPUT_TAIL_MAX =
Bounds for the live output tail (#5): how many COMPLETE lines the drill-in’s output: block shows (the buffer keeps one extra slot for the in-flight partial line), and the byte cap per buffered line so a newline-free stream can’t grow a line unbounded.
6- OUTPUT_TAIL_LINE_MAX =
200- ANSWER_NOTE_PREFIX =
Prefix #deliver_answer stamps on the steer-queue COPY of an answer it has ALREADY delivered to the child via its ask gate (the dual-path delivery: gate for a blocking ask, steer-queue for a non-blocking one). When the child resumes via the gate and finishes WITHOUT another turn boundary, the still-queued copy is drained by #complete and would surface as an “undelivered steer note” — but the answer WAS delivered via the gate, so reporting it undelivered is a false alarm (the /reply happy-path regression from the H5 fix #457). The completion-notice paths filter notes carrying this prefix OUT of the undelivered report for exactly that reason; a genuine ‘/agents <id> steer “…”` note never carries it, so the deliver-or-report-undelivered invariant for real steer notes is intact.
"[parent answer] "
Instance Attribute Summary collapse
-
#last_refusal_reason ⇒ Object
readonly
Why the most recent #reserve returned nil (one of :depth / :per_owner / :global), or nil when the last reserve succeeded.
Class Method Summary collapse
- .instance ⇒ Object
-
.reset! ⇒ Object
Test seam: drop all state between examples.
Instance Method Summary collapse
-
#ancestors_of(id) ⇒ Object
The chain of ancestors of ‘id`, nearest parent first, walking owner_subagent_id up to the human/top-level root.
-
#attach(entry, thread:, runner:) ⇒ Object
Binds the live worker thread + child runner to a reserved entry so the registry can later cancel it.
-
#awaiting_approval ⇒ Object
Entries currently parked on a human approval — surfaced on their card and answerable via /agents <id>.
-
#awaiting_human ⇒ Object
Entries parked on an escalated ask_parent, waiting on THE HUMAN — the source of the persistent "u26d4 N subagent waiting on you" marker and answerable via /reply <id>.
-
#begin_approval(id, gate:, approval_id:, question:, command:) ⇒ Object
Flips an entry into the :needs_approval state and stores the gate + question/command the card surfaces (Option 2).
-
#begin_ask(id, gate:, ask_id:, question:, blocking:, owner_id: nil) ⇒ Object
Flips an entry into the :blocked_on_human state for an escalated ask_parent: stores the gate + question + blocking flag the card/banner surface (mirror of #begin_approval, but for a child->parent question that the parent couldn’t answer and escalated to the human).
-
#cancel_all ⇒ Object
(also: #shutdown!)
Structured-concurrency teardown seam: cancel EVERY live subagent so the process never leaves a child parked.
-
#cancel_descendant_ask_gates(id) ⇒ Object
Stop-cascade (S5a): when a node is stopped, cancel the ask-gates of ALL its descendants so a blocking ask anywhere in the subtree unwinds at once (Run::ApprovalGate#cancel! wakes the parked child thread with Interrupted) instead of leaving an orphaned grandchild parked until its bound elapses.
-
#children_of(id) ⇒ Object
Direct children of ‘id`: entries whose owner_subagent_id == id.
-
#complete(entry, status:, result: nil, error: nil) ⇒ Object
Records terminal state when the worker finishes (called from its ‘ensure`).
-
#deliver_answer(id, answer) ⇒ Object
The ONE shared answer wire for an escalated ask_parent, used by BOTH the human /reply path (Commands::Executor#deliver_reply) and the model-callable ‘answer_child` tool: route the answer back DOWN to the asking child by (1) deciding its ask gate — unblocks a BLOCKING ask with the answer as its tool result — and (2) pushing the answer onto its steer queue so a NON-BLOCKING ask folds it in at its next turn boundary; then clear the blocked state (#end_ask).
-
#descendants_of(id) ⇒ Object
All transitive descendants of ‘id` (BFS over owner_subagent_id), in breadth order.
-
#end_approval(id) ⇒ Object
Clears the approval state and returns the entry to :running once a decision has been delivered (or the child unwinds).
-
#end_ask(id) ⇒ Object
Clears the ask state and returns the entry to :running once the question has been answered (by the human via /reply, or the agent-parent via answer_child), or the child unwinds / is stopped.
- #find(id) ⇒ Object
-
#initialize ⇒ BackgroundTasks
constructor
A new instance of BackgroundTasks.
-
#list ⇒ Object
All entries, newest first — for a ‘task` listing (the /tasks analogue).
-
#owned_by?(parent_id, child_id) ⇒ Boolean
True iff ‘child_id`’s direct owner is ‘parent_id` (the ownership predicate later slices’ steer/probe/answer_child AUTHORIZATION checks will build on).
-
#record_live_probe(id) ⇒ Object
Records a BILLED live probe against a child (S3): bumps probe_count and stamps last_probe_at, under the mutex (the owner runs this on its own thread while the parent renderer may read the entry).
-
#record_tool_finished(id, line) ⇒ Object
Records a child tool FINISHING: appends a terse line to the bounded activity ring the live drill-in (#71) tails.
-
#record_tool_output(id, chunk) ⇒ Object
Records a streamed chunk of the CURRENTLY RUNNING tool’s output (#5): splits on newlines into a bounded line buffer whose LAST slot carries the in-flight partial line, so the /agents drill-in can tail it live.
-
#record_tool_started(id, activity) ⇒ Object
Records a child tool STARTING: bumps the tool counter and sets the last-activity string the card/list show so concurrent tasks stay distinguishable (#124/#127).
- #remove(id) ⇒ Object
-
#request_stop(id) ⇒ Object
Marks a stop REQUEST (the /agents <id> –stop / task_stop path) on a live entry so the list/cards immediately show ◌ stopping instead of a stale ● running while the child unwinds at its next checkpoint (#108).
-
#reserve(subagent:, prompt:, owner_subagent_id: nil, depth: 0) ⇒ Object
Reserves a slot and registers a ‘running` entry, returning it.
-
#running ⇒ Object
Live (still-running) children — used by the parent stop path to cancel orphans, and to enforce the concurrency cap.
-
#steer(id, text) ⇒ Object
Records a parent->child steer note (the ‘/agents <id> steer "…"` affordance).
-
#stop_entry(entry) ⇒ Object
The ONE per-entry stop body, shared by every stop path (the human /agents <id> –stop, the model-callable task_stop, and the parent-teardown #cancel_all below).
Constructor Details
#initialize ⇒ BackgroundTasks
Returns a new instance of BackgroundTasks.
131 132 133 134 |
# File 'lib/rubino/tools/background_tasks.rb', line 131 def initialize @entries = {} @mutex = Mutex.new end |
Instance Attribute Details
#last_refusal_reason ⇒ Object (readonly)
Why the most recent #reserve returned nil (one of :depth / :per_owner / :global), or nil when the last reserve succeeded. Read by TaskTool to phrase a reason-specific at-capacity message.
183 184 185 |
# File 'lib/rubino/tools/background_tasks.rb', line 183 def last_refusal_reason @last_refusal_reason end |
Class Method Details
.instance ⇒ Object
121 122 123 |
# File 'lib/rubino/tools/background_tasks.rb', line 121 def instance @instance ||= new end |
.reset! ⇒ Object
Test seam: drop all state between examples.
126 127 128 |
# File 'lib/rubino/tools/background_tasks.rb', line 126 def reset! @instance = nil end |
Instance Method Details
#ancestors_of(id) ⇒ Object
The chain of ancestors of ‘id`, nearest parent first, walking owner_subagent_id up to the human/top-level root. Cycle-safe.
515 516 517 518 519 520 521 522 523 524 525 526 527 |
# File 'lib/rubino/tools/background_tasks.rb', line 515 def ancestors_of(id) @mutex.synchronize do out = [] seen = { id => true } cur = @entries[id]&.owner_subagent_id while cur && (entry = @entries[cur]) && !seen[cur] seen[cur] = true out << entry cur = entry.owner_subagent_id end out end end |
#attach(entry, thread:, runner:) ⇒ Object
Binds the live worker thread + child runner to a reserved entry so the registry can later cancel it. Done after reserve so the entry exists in the map before the thread starts (no race on completion writing back).
188 189 190 191 192 193 |
# File 'lib/rubino/tools/background_tasks.rb', line 188 def attach(entry, thread:, runner:) @mutex.synchronize do entry.thread = thread entry.runner = runner end end |
#awaiting_approval ⇒ Object
Entries currently parked on a human approval — surfaced on their card and answerable via /agents <id>.
458 459 460 |
# File 'lib/rubino/tools/background_tasks.rb', line 458 def awaiting_approval @mutex.synchronize { @entries.values.select { |e| e.status == :needs_approval } } end |
#awaiting_human ⇒ Object
Entries parked on an escalated ask_parent, waiting on THE HUMAN — the source of the persistent "u26d4 N subagent waiting on you" marker and answerable via /reply <id>. Counts ONLY :blocked_on_human: a :blocked_on_parent child is its agent-parent’s job (answer_child), not the human’s, so it must NOT inflate the human’s “waiting on you” count.
452 453 454 |
# File 'lib/rubino/tools/background_tasks.rb', line 452 def awaiting_human @mutex.synchronize { @entries.values.select { |e| e.status == :blocked_on_human } } end |
#begin_approval(id, gate:, approval_id:, question:, command:) ⇒ Object
Flips an entry into the :needs_approval state and stores the gate + question/command the card surfaces (Option 2). The child thread then parks on ‘gate.await(approval_id)`; the user resolves it via /agents <id>. Returns the previous status so the child can restore it.
299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/rubino/tools/background_tasks.rb', line 299 def begin_approval(id, gate:, approval_id:, question:, command:) @mutex.synchronize do entry = @entries[id] return unless entry entry.approval_gate = gate entry.approval_id = approval_id entry.approval_question = question.to_s entry.approval_command = command.to_s entry.status = :needs_approval end end |
#begin_ask(id, gate:, ask_id:, question:, blocking:, owner_id: nil) ⇒ Object
Flips an entry into the :blocked_on_human state for an escalated ask_parent: stores the gate + question + blocking flag the card/banner surface (mirror of #begin_approval, but for a child->parent question that the parent couldn’t answer and escalated to the human). The child thread then parks on ‘ask_gate.await(ask_id)` (blocking ask) until /reply <id> decides the gate, or keeps working (non-blocking ask) with the answer delivered later via the steer queue. A child in this state still holds a concurrency slot (its thread is alive, or it is awaiting the human), so it counts as live. The status depends on WHO owns the asking child (S4): owner_id present (an agent-parent) → :blocked_on_parent (the parent MODEL answers via answer_child; the question was pushed onto the owner’s steer_queue, NOT the human’s job); owner_id nil (the human / top-level) → :blocked_on_human (the human answers via /reply <id>).
385 386 387 388 389 390 391 392 393 394 395 396 |
# File 'lib/rubino/tools/background_tasks.rb', line 385 def begin_ask(id, gate:, ask_id:, question:, blocking:, owner_id: nil) @mutex.synchronize do entry = @entries[id] return unless entry entry.ask_gate = gate entry.ask_id = ask_id entry.ask_question = question.to_s entry.ask_blocking = blocking ? true : false entry.status = owner_id ? :blocked_on_parent : :blocked_on_human end end |
#cancel_all ⇒ Object Also known as: shutdown!
Structured-concurrency teardown seam: cancel EVERY live subagent so the process never leaves a child parked. The required fix for the parent-death deadlock (#XXX) — when the PARENT dies/interrupts (REPL break, HUP/TERM, clean quit, an aborted turn) a child blocked on ask_parent(blocking:true) otherwise stays parked on its gate for the full ask_parent_timeout (~900s) because nothing cancels its gate; the per-id stop paths only fire on an explicit /agents –stop or task_stop. Calling this from each parent-death edge wakes every blocked child SYNCHRONOUSLY (cancel! pushes its sentinel; the gate’s await observes it within one WAKE_TICK) so each unwinds via the existing ‘rescue Rubino::Interrupted` with the clean “parent question was cancelled” message instead of hanging to the bound. No-op when there are no live children, and idempotent (#stop_entry is), so it is safe to invoke from a teardown `ensure` and from a signal trap. Snapshots #running first (outside the per-entry work) so we don’t hold the registry mutex across the gate/runner cancels.
575 576 577 578 579 580 581 582 583 584 585 |
# File 'lib/rubino/tools/background_tasks.rb', line 575 def cancel_all running.each { |entry| stop_entry(entry) } # Logical cancel alone (above) only flips cancel tokens and trusts each # child THREAD to observe the token and reap its own shell within a wake # tick — but on parent-DEATH the process exits before the thread reaches # that checkpoint, so any shell a child spawned (its own pgid) reparents # to init as an orphan (MED-2). Reap the tracked shell process groups # SYNCHRONOUSLY here so the same parent-death edges that call cancel_all # (clean quit, HUP/TERM trap, REPL break) leave no surviving shell. ShellRegistry.instance.kill_all_groups end |
#cancel_descendant_ask_gates(id) ⇒ Object
Stop-cascade (S5a): when a node is stopped, cancel the ask-gates of ALL its descendants so a blocking ask anywhere in the subtree unwinds at once (Run::ApprovalGate#cancel! wakes the parked child thread with Interrupted) instead of leaving an orphaned grandchild parked until its bound elapses. The descendant runners’ CancelTokens are flipped by the caller’s cancel! of the node; this just makes the gate-parked ones wake immediately. Safe to call on a node with no descendants or no blocked descendants.
536 537 538 |
# File 'lib/rubino/tools/background_tasks.rb', line 536 def cancel_descendant_ask_gates(id) descendants_of(id).each { |e| e.ask_gate&.cancel! } end |
#children_of(id) ⇒ Object
Direct children of ‘id`: entries whose owner_subagent_id == id. Pass nil for the human/top-level node’s direct children.
487 488 489 |
# File 'lib/rubino/tools/background_tasks.rb', line 487 def children_of(id) @mutex.synchronize { @entries.values.select { |e| e.owner_subagent_id == id } } end |
#complete(entry, status:, result: nil, error: nil) ⇒ Object
Records terminal state when the worker finishes (called from its ‘ensure`). Single writer per entry, but guarded so #find/#list readers see a consistent snapshot. A failure landing on a :stopping entry is a USER-REQUESTED stop unwinding (Interrupted at the next checkpoint), so it is recorded as :stopped — distinct from a genuine :failed (#108/#13).
H5 — closes the drain↔complete race. The final drain of the child’s steer_queue happens HERE, under the SAME registry mutex that flips the status to terminal, and #steer refuses to push onto a terminal entry under that SAME mutex. So a steer/answer arriving concurrently is serialised against this finalize: it is EITHER pushed before the status flips (and drained right here into the returned ‘undelivered` notes) OR rejected by #steer (which then honestly reports not-delivered). The earlier shape — drain (InputQueue lock) then complete (registry lock), two locks with a gap — let an answer land on a now-dead queue: dropped, omitted from `undelivered`, yet reported delivered. Returns the notes that were still queued at finalize time (never delivered to the child), so the caller can surface them as undelivered.
229 230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/rubino/tools/background_tasks.rb', line 229 def complete(entry, status:, result: nil, error: nil) @mutex.synchronize do status = :stopped if entry.status == :stopping && status == :failed entry.status = status entry.result = result entry.error = error entry.finished_at = Time.now # Drain UNDER the mutex: anything still here is undelivered (the child # has no further turn to fold it in), and once status is terminal no # new note can arrive — #steer rejects it. entry.steer_queue&.drain || [] end end |
#deliver_answer(id, answer) ⇒ Object
The ONE shared answer wire for an escalated ask_parent, used by BOTH the human /reply path (Commands::Executor#deliver_reply) and the model-callable ‘answer_child` tool: route the answer back DOWN to the asking child by (1) deciding its ask gate — unblocks a BLOCKING ask with the answer as its tool result — and (2) pushing the answer onto its steer queue so a NON-BLOCKING ask folds it in at its next turn boundary; then clear the blocked state (#end_ask). Either way the answer PERSISTS in the child’s context. No-op (returns false) for an unknown id or one not awaiting an answer (no ask_gate); true when the answer was routed.
423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 |
# File 'lib/rubino/tools/background_tasks.rb', line 423 def deliver_answer(id, answer) entry = find(id) return false unless entry&.ask_gate # H5 — #steer is the SINGLE race-free liveness oracle here: it pushes the # answer onto the steer_queue under the registry mutex IFF the child is # still non-terminal, returning false the instant the child has finished # (atomic against #complete, which flips the status and drains the queue # under that same mutex). So we steer FIRST and let its honest result # decide everything: # false ⇒ the child already finished; neither path can reach it. Do NOT # decide the gate (a no-op for a child that will never await # it) and do NOT clear the ask — report not-delivered. # true ⇒ the child is live and the answer is queued; a BLOCKING ask # additionally needs its gate decided so the parked child wakes # with the answer as its tool result. Then clear the blocked # state and report delivered. return false unless steer(entry.id, "#{ANSWER_NOTE_PREFIX}#{answer}") entry.ask_gate.decide(entry.ask_id, answer) end_ask(entry.id) true end |
#descendants_of(id) ⇒ Object
All transitive descendants of ‘id` (BFS over owner_subagent_id), in breadth order. Cycle-safe (an id is visited at most once).
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 |
# File 'lib/rubino/tools/background_tasks.rb', line 493 def descendants_of(id) @mutex.synchronize do out = [] seen = {} frontier = @entries.values.select { |e| e.owner_subagent_id == id } until frontier.empty? nxt = [] frontier.each do |e| next if seen[e.id] seen[e.id] = true out << e nxt.concat(@entries.values.select { |c| c.owner_subagent_id == e.id }) end frontier = nxt end out end end |
#end_approval(id) ⇒ Object
Clears the approval state and returns the entry to :running once a decision has been delivered (or the child unwinds).
314 315 316 317 318 319 320 321 322 323 324 325 |
# File 'lib/rubino/tools/background_tasks.rb', line 314 def end_approval(id) @mutex.synchronize do entry = @entries[id] return unless entry entry.approval_gate = nil entry.approval_id = nil entry.approval_question = nil entry.approval_command = nil entry.status = :running if entry.status == :needs_approval end end |
#end_ask(id) ⇒ Object
Clears the ask state and returns the entry to :running once the question has been answered (by the human via /reply, or the agent-parent via answer_child), or the child unwinds / is stopped.
401 402 403 404 405 406 407 408 409 410 411 412 |
# File 'lib/rubino/tools/background_tasks.rb', line 401 def end_ask(id) @mutex.synchronize do entry = @entries[id] return unless entry entry.ask_gate = nil entry.ask_id = nil entry.ask_question = nil entry.ask_blocking = nil entry.status = :running if %i[blocked_on_human blocked_on_parent].include?(entry.status) end end |
#find(id) ⇒ Object
462 463 464 |
# File 'lib/rubino/tools/background_tasks.rb', line 462 def find(id) @mutex.synchronize { @entries[id] } end |
#list ⇒ Object
All entries, newest first — for a ‘task` listing (the /tasks analogue).
467 468 469 |
# File 'lib/rubino/tools/background_tasks.rb', line 467 def list @mutex.synchronize { @entries.values.sort_by(&:started_at).reverse } end |
#owned_by?(parent_id, child_id) ⇒ Boolean
True iff ‘child_id`’s direct owner is ‘parent_id` (the ownership predicate later slices’ steer/probe/answer_child AUTHORIZATION checks will build on).
590 591 592 593 594 595 |
# File 'lib/rubino/tools/background_tasks.rb', line 590 def owned_by?(parent_id, child_id) @mutex.synchronize do child = @entries[child_id] !child.nil? && child.owner_subagent_id == parent_id end end |
#record_live_probe(id) ⇒ Object
Records a BILLED live probe against a child (S3): bumps probe_count and stamps last_probe_at, under the mutex (the owner runs this on its own thread while the parent renderer may read the entry). Returns the new count, or nil for an unknown id. Free snapshot probes (live:false) never call this — only ‘probe(live:true)` does, after the budget check passes.
360 361 362 363 364 365 366 367 368 369 |
# File 'lib/rubino/tools/background_tasks.rb', line 360 def record_live_probe(id) @mutex.synchronize do entry = @entries[id] return nil unless entry entry.probe_count = entry.probe_count.to_i + 1 entry.last_probe_at = Time.now entry.probe_count end end |
#record_tool_finished(id, line) ⇒ Object
Records a child tool FINISHING: appends a terse line to the bounded activity ring the live drill-in (#71) tails. Keeps the last ACTIVITY_LOG_MAX entries so the ring never grows unbounded for a read-heavy child. Also wipes the live output tail — it belongs to the tool that just finished, so the drill-in’s output: block clears (#5).
264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/rubino/tools/background_tasks.rb', line 264 def record_tool_finished(id, line) @mutex.synchronize do entry = @entries[id] return unless entry log = (entry.activity_log ||= []) log << line.to_s log.shift while log.size > ACTIVITY_LOG_MAX entry.output_tail = nil end end |
#record_tool_output(id, chunk) ⇒ Object
Records a streamed chunk of the CURRENTLY RUNNING tool’s output (#5): splits on newlines into a bounded line buffer whose LAST slot carries the in-flight partial line, so the /agents drill-in can tail it live. Called from UI::SubagentView#tool_chunk on the CHILD thread, so it MUST take the mutex like the other record_* writers. No-op for an unknown id.
281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/rubino/tools/background_tasks.rb', line 281 def record_tool_output(id, chunk) @mutex.synchronize do entry = @entries[id] return unless entry tail = (entry.output_tail ||= [""]) chunk.to_s.each_line do |line| tail[-1] = "#{tail[-1]}#{line.chomp}"[0, OUTPUT_TAIL_LINE_MAX] tail << "" if line.end_with?("\n") end tail.shift while tail.size > OUTPUT_TAIL_MAX + 1 end end |
#record_tool_started(id, activity) ⇒ Object
Records a child tool STARTING: bumps the tool counter and sets the last-activity string the card/list show so concurrent tasks stay distinguishable (#124/#127). Called from UI::SubagentView#tool_started, which runs on the CHILD thread, so it MUST take the mutex (the parent renderer reads these fields concurrently). No-op for an unknown id (a late event after #remove).
249 250 251 252 253 254 255 256 257 |
# File 'lib/rubino/tools/background_tasks.rb', line 249 def record_tool_started(id, activity) @mutex.synchronize do entry = @entries[id] return unless entry entry.tool_count = entry.tool_count.to_i + 1 entry.last_activity = activity.to_s end end |
#remove(id) ⇒ Object
479 480 481 |
# File 'lib/rubino/tools/background_tasks.rb', line 479 def remove(id) @mutex.synchronize { @entries.delete(id) } end |
#request_stop(id) ⇒ Object
Marks a stop REQUEST (the /agents <id> –stop / task_stop path) on a live entry so the list/cards immediately show ◌ stopping instead of a stale ● running while the child unwinds at its next checkpoint (#108). Returns true when the entry flipped. #complete then maps a failure on a :stopping entry to the terminal :stopped, so a deliberate stop never reads as ✗ failed (#13).
201 202 203 204 205 206 207 208 209 |
# File 'lib/rubino/tools/background_tasks.rb', line 201 def request_stop(id) @mutex.synchronize do entry = @entries[id] return false unless entry && live_status?(entry.status) entry.status = :stopping true end end |
#reserve(subagent:, prompt:, owner_subagent_id: nil, depth: 0) ⇒ Object
Reserves a slot and registers a ‘running` entry, returning it. The caller then attaches the worker thread + runner via #attach.
owner_subagent_id is the ‘sa_*` id of the SPAWNING subagent (nil ⇒ the human / top-level agent spawned this child). depth is the caller’s hint for a human-spawned child (0); for an owner-spawned child the depth is recomputed here from the owner entry (owner.depth + 1) so a stale hint can’t smuggle a child past the depth cap.
Returns nil — so TaskTool can surface a clear message instead of spawning unbounded work — when ANY of the three nesting caps is hit. The reason is available via #last_refusal_reason for the caller to phrase the message:
:depth — depth >= max_depth (no deeper nesting allowed)
:per_owner — this owner already has max_children_per_node live kids
:global — total live subagents across the tree >= max total
This is the SINGLE enforcement point for every nesting limit.
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/rubino/tools/background_tasks.rb', line 152 def reserve(subagent:, prompt:, owner_subagent_id: nil, depth: 0) @mutex.synchronize do owner = owner_subagent_id ? @entries[owner_subagent_id] : nil effective_depth = owner ? owner.depth.to_i + 1 : depth.to_i @last_refusal_reason = refusal_reason(owner_subagent_id, effective_depth) return nil if @last_refusal_reason entry = Entry.new( id: new_id, subagent: subagent.to_s, prompt: prompt.to_s, status: :running, started_at: Time.now, tool_count: 0, activity_log: [], # Every background child gets its OWN steering queue at reserve time # so the parent can `/agents <id> steer "..."` it the instant it is # listed — no separate wiring step, no nil window. steer_queue: Interaction::InputQueue.new, owner_subagent_id: owner_subagent_id, depth: effective_depth ) @entries[entry.id] = entry entry end end |
#running ⇒ Object
Live (still-running) children — used by the parent stop path to cancel orphans, and to enforce the concurrency cap. A child parked on a human approval (:needs_approval) is STILL live (its thread is alive, holding a slot), so it counts as running here.
475 476 477 |
# File 'lib/rubino/tools/background_tasks.rb', line 475 def running @mutex.synchronize { @entries.values.select { |e| live_status?(e.status) } } end |
#steer(id, text) ⇒ Object
Records a parent->child steer note (the ‘/agents <id> steer "…"` affordance). Pushes the text onto the child’s steering queue, which the child Loop drains at its next iteration boundary (Loop#inject_steered_input) — between turns, never between a tool_use and its results. Best-effort: returns false (and pushes nothing) when the entry is gone, has no queue, or has ALREADY reached a terminal state (the child finished — there is no more turn to fold the note into); true when the note was queued.
H5 — the push happens UNDER the registry mutex, gated on a non-terminal status, so it is serialised against #complete (which flips the status to terminal AND drains the queue under that SAME mutex). Either this push wins the lock first (the note is queued and will be drained — by the child at its next turn, or by #complete into the undelivered report) or #complete wins first (status is terminal and this returns false). There is no window in which a note is pushed onto a queue nobody will drain yet reported delivered. Pushing inside the mutex is safe: InputQueue#push has its own lock and never calls back into the registry, so no lock cycle.
344 345 346 347 348 349 350 351 352 353 |
# File 'lib/rubino/tools/background_tasks.rb', line 344 def steer(id, text) @mutex.synchronize do entry = @entries[id] return false unless entry&.steer_queue return false if terminal_status?(entry.status) entry.steer_queue.push(text) true end end |
#stop_entry(entry) ⇒ Object
The ONE per-entry stop body, shared by every stop path (the human /agents <id> –stop, the model-callable task_stop, and the parent-teardown #cancel_all below). Marks the stop so the unwind records as :stopped (not ✗ failed) and the list shows ◌ stopping, then wakes the entry no matter HOW it is blocked: a child parked on its OWN approval or ask gate (cancel those → Interrupted → clean unwind), any descendant parked on a blocking ask (the stop-cascade), and the runner’s CancelToken for a child between checkpoints. Idempotent and safe on an already-stopped or never-blocked entry (each cancel! is one-shot; request_stop no-ops on a non-live status), so #cancel_all can call it across the whole registry.
550 551 552 553 554 555 556 557 558 |
# File 'lib/rubino/tools/background_tasks.rb', line 550 def stop_entry(entry) return unless entry request_stop(entry.id) entry.approval_gate&.cancel! entry.ask_gate&.cancel! cancel_descendant_ask_gates(entry.id) entry.runner&.cancel! end |