Class: Rubino::Tools::BackgroundTasks
- Inherits:
-
Object
- Object
- Rubino::Tools::BackgroundTasks
- Defined in:
- lib/rubino/tools/background_tasks.rb
Overview
Process-wide registry for subagents started by the ‘task` tool in the BACKGROUND (the default). Mirrors ShellRegistry — the in-repo precedent for “fire-and-forget + poll later + kill” — but the unit of work is a nested Agent::Runner thread instead of a detached OS process.
Each entry owns:
- the worker Thread running the child Runner#run!,
- the child Runner (so #cancel can flip its CancelToken — exactly the
mechanism Run::Executor's stop-watcher uses for top-level runs),
- the terminal status/result/error captured in the worker's `ensure`.
The registry survives a single CLI/server process — like ShellRegistry it is intentionally NOT persisted. Background subagents die with the process.
Concurrency cap (mirrors the reference _DEFAULT_MAX_CONCURRENT_CHILDREN = 3): a background subagent is a full LLM run = real cost, so #spawn refuses past MAX_CONCURRENT live children rather than fanning out unbounded threads.
Defined Under Namespace
Classes: Entry
Constant Summary collapse
- MAX_CONCURRENT =
3- MAX_DEPTH =
Fallback caps for the nested-subagent tree, used when config is absent (e.g. a bare registry in a unit test with no Configuration wired). The live values come from config (tasks.max_depth / max_children_per_node / max_concurrent_total); these constants are the built-in defaults the config keys themselves default to. All three are enforced in #reserve.
2- MAX_CHILDREN_PER_NODE =
3- MAX_CONCURRENT_TOTAL =
8- ACTIVITY_LOG_MAX =
How many recent activity lines the drill-in shows (the live ‘recent:` ring).
6- OUTPUT_TAIL_MAX =
Bounds for the live output tail (#5): how many COMPLETE lines the drill-in’s output: block shows (the buffer keeps one extra slot for the in-flight partial line), and the byte cap per buffered line so a newline-free stream can’t grow a line unbounded.
6- OUTPUT_TAIL_LINE_MAX =
200
Instance Attribute Summary collapse
-
#last_refusal_reason ⇒ Object
readonly
Why the most recent #reserve returned nil (one of :depth / :per_owner / :global), or nil when the last reserve succeeded.
Class Method Summary collapse
- .instance ⇒ Object
-
.reset! ⇒ Object
Test seam: drop all state between examples.
Instance Method Summary collapse
-
#ancestors_of(id) ⇒ Object
The chain of ancestors of ‘id`, nearest parent first, walking owner_subagent_id up to the human/top-level root.
-
#attach(entry, thread:, runner:) ⇒ Object
Binds the live worker thread + child runner to a reserved entry so the registry can later cancel it.
-
#awaiting_approval ⇒ Object
Entries currently parked on a human approval — surfaced on their card and answerable via /agents <id>.
-
#awaiting_human ⇒ Object
Entries parked on an escalated ask_parent, waiting on THE HUMAN — the source of the persistent "u26d4 N subagent waiting on you" marker and answerable via /reply <id>.
-
#begin_approval(id, gate:, approval_id:, question:, command:) ⇒ Object
Flips an entry into the :needs_approval state and stores the gate + question/command the card surfaces (Option 2).
-
#begin_ask(id, gate:, ask_id:, question:, blocking:, owner_id: nil) ⇒ Object
Flips an entry into the :blocked_on_human state for an escalated ask_parent: stores the gate + question + blocking flag the card/banner surface (mirror of #begin_approval, but for a child->parent question that the parent couldn’t answer and escalated to the human).
-
#cancel_descendant_ask_gates(id) ⇒ Object
Stop-cascade (S5a): when a node is stopped, cancel the ask-gates of ALL its descendants so a blocking ask anywhere in the subtree unwinds at once (Run::ApprovalGate#cancel! wakes the parked child thread with Interrupted) instead of leaving an orphaned grandchild parked until its bound elapses.
-
#children_of(id) ⇒ Object
Direct children of ‘id`: entries whose owner_subagent_id == id.
-
#complete(entry, status:, result: nil, error: nil) ⇒ Object
Records terminal state when the worker finishes (called from its ‘ensure`).
-
#deliver_answer(id, answer) ⇒ Object
The ONE shared answer wire for an escalated ask_parent, used by BOTH the human /reply path (Commands::Executor#deliver_reply) and the model-callable ‘answer_child` tool: route the answer back DOWN to the asking child by (1) deciding its ask gate — unblocks a BLOCKING ask with the answer as its tool result — and (2) pushing the answer onto its steer queue so a NON-BLOCKING ask folds it in at its next turn boundary; then clear the blocked state (#end_ask).
-
#descendants_of(id) ⇒ Object
All transitive descendants of ‘id` (BFS over owner_subagent_id), in breadth order.
-
#end_approval(id) ⇒ Object
Clears the approval state and returns the entry to :running once a decision has been delivered (or the child unwinds).
-
#end_ask(id) ⇒ Object
Clears the ask state and returns the entry to :running once the question has been answered (by the human via /reply, or the agent-parent via answer_child), or the child unwinds / is stopped.
- #find(id) ⇒ Object
-
#initialize ⇒ BackgroundTasks
constructor
A new instance of BackgroundTasks.
-
#list ⇒ Object
All entries, newest first — for a ‘task` listing (the /tasks analogue).
-
#owned_by?(parent_id, child_id) ⇒ Boolean
True iff ‘child_id`’s direct owner is ‘parent_id` (the ownership predicate later slices’ steer/probe/answer_child AUTHORIZATION checks will build on).
-
#record_live_probe(id) ⇒ Object
Records a BILLED live probe against a child (S3): bumps probe_count and stamps last_probe_at, under the mutex (the owner runs this on its own thread while the parent renderer may read the entry).
-
#record_tool_finished(id, line) ⇒ Object
Records a child tool FINISHING: appends a terse line to the bounded activity ring the live drill-in (#71) tails.
-
#record_tool_output(id, chunk) ⇒ Object
Records a streamed chunk of the CURRENTLY RUNNING tool’s output (#5): splits on newlines into a bounded line buffer whose LAST slot carries the in-flight partial line, so the /agents drill-in can tail it live.
-
#record_tool_started(id, activity) ⇒ Object
Records a child tool STARTING: bumps the tool counter and sets the last-activity string the card/list show so concurrent tasks stay distinguishable (#124/#127).
- #remove(id) ⇒ Object
-
#request_stop(id) ⇒ Object
Marks a stop REQUEST (the /agents <id> –stop / task_stop path) on a live entry so the list/cards immediately show ◌ stopping instead of a stale ● running while the child unwinds at its next checkpoint (#108).
-
#reserve(subagent:, prompt:, owner_subagent_id: nil, depth: 0) ⇒ Object
Reserves a slot and registers a ‘running` entry, returning it.
-
#running ⇒ Object
Live (still-running) children — used by the parent stop path to cancel orphans, and to enforce the concurrency cap.
-
#steer(id, text) ⇒ Object
Records a parent->child steer note (the ‘/agents <id> steer "…"` affordance).
Constructor Details
#initialize ⇒ BackgroundTasks
Returns a new instance of BackgroundTasks.
118 119 120 121 |
# File 'lib/rubino/tools/background_tasks.rb', line 118 def initialize @entries = {} @mutex = Mutex.new end |
Instance Attribute Details
#last_refusal_reason ⇒ Object (readonly)
Why the most recent #reserve returned nil (one of :depth / :per_owner / :global), or nil when the last reserve succeeded. Read by TaskTool to phrase a reason-specific at-capacity message.
170 171 172 |
# File 'lib/rubino/tools/background_tasks.rb', line 170 def last_refusal_reason @last_refusal_reason end |
Class Method Details
.instance ⇒ Object
108 109 110 |
# File 'lib/rubino/tools/background_tasks.rb', line 108 def instance @instance ||= new end |
.reset! ⇒ Object
Test seam: drop all state between examples.
113 114 115 |
# File 'lib/rubino/tools/background_tasks.rb', line 113 def reset! @instance = nil end |
Instance Method Details
#ancestors_of(id) ⇒ Object
The chain of ancestors of ‘id`, nearest parent first, walking owner_subagent_id up to the human/top-level root. Cycle-safe.
460 461 462 463 464 465 466 467 468 469 470 471 472 |
# File 'lib/rubino/tools/background_tasks.rb', line 460 def ancestors_of(id) @mutex.synchronize do out = [] seen = { id => true } cur = @entries[id]&.owner_subagent_id while cur && (entry = @entries[cur]) && !seen[cur] seen[cur] = true out << entry cur = entry.owner_subagent_id end out end end |
#attach(entry, thread:, runner:) ⇒ Object
Binds the live worker thread + child runner to a reserved entry so the registry can later cancel it. Done after reserve so the entry exists in the map before the thread starts (no race on completion writing back).
175 176 177 178 179 180 |
# File 'lib/rubino/tools/background_tasks.rb', line 175 def attach(entry, thread:, runner:) @mutex.synchronize do entry.thread = thread entry.runner = runner end end |
#awaiting_approval ⇒ Object
Entries currently parked on a human approval — surfaced on their card and answerable via /agents <id>.
403 404 405 |
# File 'lib/rubino/tools/background_tasks.rb', line 403 def awaiting_approval @mutex.synchronize { @entries.values.select { |e| e.status == :needs_approval } } end |
#awaiting_human ⇒ Object
Entries parked on an escalated ask_parent, waiting on THE HUMAN — the source of the persistent "u26d4 N subagent waiting on you" marker and answerable via /reply <id>. Counts ONLY :blocked_on_human: a :blocked_on_parent child is its agent-parent’s job (answer_child), not the human’s, so it must NOT inflate the human’s “waiting on you” count.
397 398 399 |
# File 'lib/rubino/tools/background_tasks.rb', line 397 def awaiting_human @mutex.synchronize { @entries.values.select { |e| e.status == :blocked_on_human } } end |
#begin_approval(id, gate:, approval_id:, question:, command:) ⇒ Object
Flips an entry into the :needs_approval state and stores the gate + question/command the card surfaces (Option 2). The child thread then parks on ‘gate.await(approval_id)`; the user resolves it via /agents <id>. Returns the previous status so the child can restore it.
269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/rubino/tools/background_tasks.rb', line 269 def begin_approval(id, gate:, approval_id:, question:, command:) @mutex.synchronize do entry = @entries[id] return unless entry entry.approval_gate = gate entry.approval_id = approval_id entry.approval_question = question.to_s entry.approval_command = command.to_s entry.status = :needs_approval end end |
#begin_ask(id, gate:, ask_id:, question:, blocking:, owner_id: nil) ⇒ Object
Flips an entry into the :blocked_on_human state for an escalated ask_parent: stores the gate + question + blocking flag the card/banner surface (mirror of #begin_approval, but for a child->parent question that the parent couldn’t answer and escalated to the human). The child thread then parks on ‘ask_gate.await(ask_id)` (blocking ask) until /reply <id> decides the gate, or keeps working (non-blocking ask) with the answer delivered later via the steer queue. A child in this state still holds a concurrency slot (its thread is alive, or it is awaiting the human), so it counts as live. The status depends on WHO owns the asking child (S4): owner_id present (an agent-parent) → :blocked_on_parent (the parent MODEL answers via answer_child; the question was pushed onto the owner’s steer_queue, NOT the human’s job); owner_id nil (the human / top-level) → :blocked_on_human (the human answers via /reply <id>).
344 345 346 347 348 349 350 351 352 353 354 355 |
# File 'lib/rubino/tools/background_tasks.rb', line 344 def begin_ask(id, gate:, ask_id:, question:, blocking:, owner_id: nil) @mutex.synchronize do entry = @entries[id] return unless entry entry.ask_gate = gate entry.ask_id = ask_id entry.ask_question = question.to_s entry.ask_blocking = blocking ? true : false entry.status = owner_id ? :blocked_on_parent : :blocked_on_human end end |
#cancel_descendant_ask_gates(id) ⇒ Object
Stop-cascade (S5a): when a node is stopped, cancel the ask-gates of ALL its descendants so a blocking ask anywhere in the subtree unwinds at once (Run::ApprovalGate#cancel! wakes the parked child thread with Interrupted) instead of leaving an orphaned grandchild parked until its bound elapses. The descendant runners’ CancelTokens are flipped by the caller’s cancel! of the node; this just makes the gate-parked ones wake immediately. Safe to call on a node with no descendants or no blocked descendants.
481 482 483 |
# File 'lib/rubino/tools/background_tasks.rb', line 481 def cancel_descendant_ask_gates(id) descendants_of(id).each { |e| e.ask_gate&.cancel! } end |
#children_of(id) ⇒ Object
Direct children of ‘id`: entries whose owner_subagent_id == id. Pass nil for the human/top-level node’s direct children.
432 433 434 |
# File 'lib/rubino/tools/background_tasks.rb', line 432 def children_of(id) @mutex.synchronize { @entries.values.select { |e| e.owner_subagent_id == id } } end |
#complete(entry, status:, result: nil, error: nil) ⇒ Object
Records terminal state when the worker finishes (called from its ‘ensure`). Single writer per entry, but guarded so #find/#list readers see a consistent snapshot. A failure landing on a :stopping entry is a USER-REQUESTED stop unwinding (Interrupted at the next checkpoint), so it is recorded as :stopped — distinct from a genuine :failed (#108/#13).
203 204 205 206 207 208 209 210 211 |
# File 'lib/rubino/tools/background_tasks.rb', line 203 def complete(entry, status:, result: nil, error: nil) @mutex.synchronize do status = :stopped if entry.status == :stopping && status == :failed entry.status = status entry.result = result entry.error = error entry.finished_at = Time.now end end |
#deliver_answer(id, answer) ⇒ Object
The ONE shared answer wire for an escalated ask_parent, used by BOTH the human /reply path (Commands::Executor#deliver_reply) and the model-callable ‘answer_child` tool: route the answer back DOWN to the asking child by (1) deciding its ask gate — unblocks a BLOCKING ask with the answer as its tool result — and (2) pushing the answer onto its steer queue so a NON-BLOCKING ask folds it in at its next turn boundary; then clear the blocked state (#end_ask). Either way the answer PERSISTS in the child’s context. No-op (returns false) for an unknown id or one not awaiting an answer (no ask_gate); true when the answer was routed.
382 383 384 385 386 387 388 389 390 |
# File 'lib/rubino/tools/background_tasks.rb', line 382 def deliver_answer(id, answer) entry = find(id) return false unless entry&.ask_gate entry.ask_gate.decide(entry.ask_id, answer) steer(entry.id, "[parent answer] #{answer}") end_ask(entry.id) true end |
#descendants_of(id) ⇒ Object
All transitive descendants of ‘id` (BFS over owner_subagent_id), in breadth order. Cycle-safe (an id is visited at most once).
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 |
# File 'lib/rubino/tools/background_tasks.rb', line 438 def descendants_of(id) @mutex.synchronize do out = [] seen = {} frontier = @entries.values.select { |e| e.owner_subagent_id == id } until frontier.empty? nxt = [] frontier.each do |e| next if seen[e.id] seen[e.id] = true out << e nxt.concat(@entries.values.select { |c| c.owner_subagent_id == e.id }) end frontier = nxt end out end end |
#end_approval(id) ⇒ Object
Clears the approval state and returns the entry to :running once a decision has been delivered (or the child unwinds).
284 285 286 287 288 289 290 291 292 293 294 295 |
# File 'lib/rubino/tools/background_tasks.rb', line 284 def end_approval(id) @mutex.synchronize do entry = @entries[id] return unless entry entry.approval_gate = nil entry.approval_id = nil entry.approval_question = nil entry.approval_command = nil entry.status = :running if entry.status == :needs_approval end end |
#end_ask(id) ⇒ Object
Clears the ask state and returns the entry to :running once the question has been answered (by the human via /reply, or the agent-parent via answer_child), or the child unwinds / is stopped.
360 361 362 363 364 365 366 367 368 369 370 371 |
# File 'lib/rubino/tools/background_tasks.rb', line 360 def end_ask(id) @mutex.synchronize do entry = @entries[id] return unless entry entry.ask_gate = nil entry.ask_id = nil entry.ask_question = nil entry.ask_blocking = nil entry.status = :running if %i[blocked_on_human blocked_on_parent].include?(entry.status) end end |
#find(id) ⇒ Object
407 408 409 |
# File 'lib/rubino/tools/background_tasks.rb', line 407 def find(id) @mutex.synchronize { @entries[id] } end |
#list ⇒ Object
All entries, newest first — for a ‘task` listing (the /tasks analogue).
412 413 414 |
# File 'lib/rubino/tools/background_tasks.rb', line 412 def list @mutex.synchronize { @entries.values.sort_by(&:started_at).reverse } end |
#owned_by?(parent_id, child_id) ⇒ Boolean
True iff ‘child_id`’s direct owner is ‘parent_id` (the ownership predicate later slices’ steer/probe/answer_child AUTHORIZATION checks will build on).
487 488 489 490 491 492 |
# File 'lib/rubino/tools/background_tasks.rb', line 487 def owned_by?(parent_id, child_id) @mutex.synchronize do child = @entries[child_id] !child.nil? && child.owner_subagent_id == parent_id end end |
#record_live_probe(id) ⇒ Object
Records a BILLED live probe against a child (S3): bumps probe_count and stamps last_probe_at, under the mutex (the owner runs this on its own thread while the parent renderer may read the entry). Returns the new count, or nil for an unknown id. Free snapshot probes (live:false) never call this — only ‘probe(live:true)` does, after the budget check passes.
319 320 321 322 323 324 325 326 327 328 |
# File 'lib/rubino/tools/background_tasks.rb', line 319 def record_live_probe(id) @mutex.synchronize do entry = @entries[id] return nil unless entry entry.probe_count = entry.probe_count.to_i + 1 entry.last_probe_at = Time.now entry.probe_count end end |
#record_tool_finished(id, line) ⇒ Object
Records a child tool FINISHING: appends a terse line to the bounded activity ring the live drill-in (#71) tails. Keeps the last ACTIVITY_LOG_MAX entries so the ring never grows unbounded for a read-heavy child. Also wipes the live output tail — it belongs to the tool that just finished, so the drill-in’s output: block clears (#5).
234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/rubino/tools/background_tasks.rb', line 234 def record_tool_finished(id, line) @mutex.synchronize do entry = @entries[id] return unless entry log = (entry.activity_log ||= []) log << line.to_s log.shift while log.size > ACTIVITY_LOG_MAX entry.output_tail = nil end end |
#record_tool_output(id, chunk) ⇒ Object
Records a streamed chunk of the CURRENTLY RUNNING tool’s output (#5): splits on newlines into a bounded line buffer whose LAST slot carries the in-flight partial line, so the /agents drill-in can tail it live. Called from UI::SubagentView#tool_chunk on the CHILD thread, so it MUST take the mutex like the other record_* writers. No-op for an unknown id.
251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/rubino/tools/background_tasks.rb', line 251 def record_tool_output(id, chunk) @mutex.synchronize do entry = @entries[id] return unless entry tail = (entry.output_tail ||= [""]) chunk.to_s.each_line do |line| tail[-1] = "#{tail[-1]}#{line.chomp}"[0, OUTPUT_TAIL_LINE_MAX] tail << "" if line.end_with?("\n") end tail.shift while tail.size > OUTPUT_TAIL_MAX + 1 end end |
#record_tool_started(id, activity) ⇒ Object
Records a child tool STARTING: bumps the tool counter and sets the last-activity string the card/list show so concurrent tasks stay distinguishable (#124/#127). Called from UI::SubagentView#tool_started, which runs on the CHILD thread, so it MUST take the mutex (the parent renderer reads these fields concurrently). No-op for an unknown id (a late event after #remove).
219 220 221 222 223 224 225 226 227 |
# File 'lib/rubino/tools/background_tasks.rb', line 219 def record_tool_started(id, activity) @mutex.synchronize do entry = @entries[id] return unless entry entry.tool_count = entry.tool_count.to_i + 1 entry.last_activity = activity.to_s end end |
#remove(id) ⇒ Object
424 425 426 |
# File 'lib/rubino/tools/background_tasks.rb', line 424 def remove(id) @mutex.synchronize { @entries.delete(id) } end |
#request_stop(id) ⇒ Object
Marks a stop REQUEST (the /agents <id> –stop / task_stop path) on a live entry so the list/cards immediately show ◌ stopping instead of a stale ● running while the child unwinds at its next checkpoint (#108). Returns true when the entry flipped. #complete then maps a failure on a :stopping entry to the terminal :stopped, so a deliberate stop never reads as ✗ failed (#13).
188 189 190 191 192 193 194 195 196 |
# File 'lib/rubino/tools/background_tasks.rb', line 188 def request_stop(id) @mutex.synchronize do entry = @entries[id] return false unless entry && live_status?(entry.status) entry.status = :stopping true end end |
#reserve(subagent:, prompt:, owner_subagent_id: nil, depth: 0) ⇒ Object
Reserves a slot and registers a ‘running` entry, returning it. The caller then attaches the worker thread + runner via #attach.
owner_subagent_id is the ‘sa_*` id of the SPAWNING subagent (nil ⇒ the human / top-level agent spawned this child). depth is the caller’s hint for a human-spawned child (0); for an owner-spawned child the depth is recomputed here from the owner entry (owner.depth + 1) so a stale hint can’t smuggle a child past the depth cap.
Returns nil — so TaskTool can surface a clear message instead of spawning unbounded work — when ANY of the three nesting caps is hit. The reason is available via #last_refusal_reason for the caller to phrase the message:
:depth — depth >= max_depth (no deeper nesting allowed)
:per_owner — this owner already has max_children_per_node live kids
:global — total live subagents across the tree >= max total
This is the SINGLE enforcement point for every nesting limit.
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/rubino/tools/background_tasks.rb', line 139 def reserve(subagent:, prompt:, owner_subagent_id: nil, depth: 0) @mutex.synchronize do owner = owner_subagent_id ? @entries[owner_subagent_id] : nil effective_depth = owner ? owner.depth.to_i + 1 : depth.to_i @last_refusal_reason = refusal_reason(owner_subagent_id, effective_depth) return nil if @last_refusal_reason entry = Entry.new( id: new_id, subagent: subagent.to_s, prompt: prompt.to_s, status: :running, started_at: Time.now, tool_count: 0, activity_log: [], # Every background child gets its OWN steering queue at reserve time # so the parent can `/agents <id> steer "..."` it the instant it is # listed — no separate wiring step, no nil window. steer_queue: Interaction::InputQueue.new, owner_subagent_id: owner_subagent_id, depth: effective_depth ) @entries[entry.id] = entry entry end end |
#running ⇒ Object
Live (still-running) children — used by the parent stop path to cancel orphans, and to enforce the concurrency cap. A child parked on a human approval (:needs_approval) is STILL live (its thread is alive, holding a slot), so it counts as running here.
420 421 422 |
# File 'lib/rubino/tools/background_tasks.rb', line 420 def running @mutex.synchronize { @entries.values.select { |e| live_status?(e.status) } } end |
#steer(id, text) ⇒ Object
Records a parent->child steer note (the ‘/agents <id> steer "…"` affordance). Pushes the text onto the child’s steering queue, which the child Loop drains at its next iteration boundary (Loop#inject_steered_input) — between turns, never between a tool_use and its results. Best-effort: returns false (and pushes nothing) when the entry is gone or has no queue (e.g. a finished child), true when the note was queued.
303 304 305 306 307 308 309 310 311 312 |
# File 'lib/rubino/tools/background_tasks.rb', line 303 def steer(id, text) queue = @mutex.synchronize do entry = @entries[id] entry&.steer_queue end return false unless queue queue.push(text) true end |