Class: Rubino::Tools::BackgroundTasks
- Inherits:
-
Object
- Object
- Rubino::Tools::BackgroundTasks
- Defined in:
- lib/rubino/tools/background_tasks.rb
Overview
Process-wide registry for subagents started by the ‘task` tool in the BACKGROUND (the default). Mirrors ShellRegistry — the in-repo precedent for “fire-and-forget + poll later + kill” — but the unit of work is a nested Agent::Runner thread instead of a detached OS process.
Each entry owns:
- the worker Thread running the child Runner#run!,
- the child Runner (so #cancel can flip its CancelToken — exactly the
mechanism Run::Executor's stop-watcher uses for top-level runs),
- the terminal status/result/error captured in the worker's `ensure`.
The registry survives a single CLI/server process — like ShellRegistry it is intentionally NOT persisted. Background subagents die with the process.
Concurrency cap (mirrors the reference _DEFAULT_MAX_CONCURRENT_CHILDREN = 3): a background subagent is a full LLM run = real cost, so #spawn refuses past MAX_CONCURRENT live children rather than fanning out unbounded threads.
Defined Under Namespace
Classes: Entry
Constant Summary collapse
- MAX_CONCURRENT =
3- MAX_DEPTH =
Fallback caps for the nested-subagent tree, used when config is absent (e.g. a bare registry in a unit test with no Configuration wired). The live values come from config (tasks.max_depth / max_children_per_node / max_concurrent_total); these constants are the built-in defaults the config keys themselves default to. All three are enforced in #reserve.
2- MAX_CHILDREN_PER_NODE =
3- MAX_CONCURRENT_TOTAL =
8- ACTIVITY_LOG_MAX =
How many recent activity lines the drill-in shows (the live ‘recent:` ring).
6
Instance Attribute Summary collapse
-
#last_refusal_reason ⇒ Object
readonly
Why the most recent #reserve returned nil (one of :depth / :per_owner / :global), or nil when the last reserve succeeded.
Class Method Summary collapse
- .instance ⇒ Object
-
.reset! ⇒ Object
Test seam: drop all state between examples.
Instance Method Summary collapse
-
#ancestors_of(id) ⇒ Object
The chain of ancestors of ‘id`, nearest parent first, walking owner_subagent_id up to the human/top-level root.
-
#attach(entry, thread:, runner:) ⇒ Object
Binds the live worker thread + child runner to a reserved entry so the registry can later cancel it.
-
#awaiting_approval ⇒ Object
Entries currently parked on a human approval — surfaced on their card and answerable via /agents <id>.
-
#awaiting_human ⇒ Object
Entries parked on an escalated ask_parent, waiting on THE HUMAN — the source of the persistent "u26d4 N subagent waiting on you" marker and answerable via /reply <id>.
-
#begin_approval(id, gate:, approval_id:, question:, command:) ⇒ Object
Flips an entry into the :needs_approval state and stores the gate + question/command the card surfaces (Option 2).
-
#begin_ask(id, gate:, ask_id:, question:, blocking:, owner_id: nil) ⇒ Object
Flips an entry into the :blocked_on_human state for an escalated ask_parent: stores the gate + question + blocking flag the card/banner surface (mirror of #begin_approval, but for a child->parent question that the parent couldn’t answer and escalated to the human).
-
#cancel_descendant_ask_gates(id) ⇒ Object
Stop-cascade (S5a): when a node is stopped, cancel the ask-gates of ALL its descendants so a blocking ask anywhere in the subtree unwinds at once (Run::ApprovalGate#cancel! wakes the parked child thread with Interrupted) instead of leaving an orphaned grandchild parked until its bound elapses.
-
#children_of(id) ⇒ Object
Direct children of ‘id`: entries whose owner_subagent_id == id.
-
#complete(entry, status:, result: nil, error: nil) ⇒ Object
Records terminal state when the worker finishes (called from its ‘ensure`).
-
#deliver_answer(id, answer) ⇒ Object
The ONE shared answer wire for an escalated ask_parent, used by BOTH the human /reply path (Commands::Executor#deliver_reply) and the model-callable ‘answer_child` tool: route the answer back DOWN to the asking child by (1) deciding its ask gate — unblocks a BLOCKING ask with the answer as its tool result — and (2) pushing the answer onto its steer queue so a NON-BLOCKING ask folds it in at its next turn boundary; then clear the blocked state (#end_ask).
-
#descendants_of(id) ⇒ Object
All transitive descendants of ‘id` (BFS over owner_subagent_id), in breadth order.
-
#end_approval(id) ⇒ Object
Clears the approval state and returns the entry to :running once a decision has been delivered (or the child unwinds).
-
#end_ask(id) ⇒ Object
Clears the ask state and returns the entry to :running once the question has been answered (by the human via /reply, or the agent-parent via answer_child), or the child unwinds / is stopped.
- #find(id) ⇒ Object
-
#initialize ⇒ BackgroundTasks
constructor
A new instance of BackgroundTasks.
-
#list ⇒ Object
All entries, newest first — for a ‘task` listing (the /tasks analogue).
-
#owned_by?(parent_id, child_id) ⇒ Boolean
True iff ‘child_id`’s direct owner is ‘parent_id` (the ownership predicate later slices’ steer/probe/answer_child AUTHORIZATION checks will build on).
-
#record_live_probe(id) ⇒ Object
Records a BILLED live probe against a child (S3): bumps probe_count and stamps last_probe_at, under the mutex (the owner runs this on its own thread while the parent renderer may read the entry).
-
#record_tool_finished(id, line) ⇒ Object
Records a child tool FINISHING: appends a terse line to the bounded activity ring the live drill-in (#71) tails.
-
#record_tool_started(id, activity) ⇒ Object
Records a child tool STARTING: bumps the tool counter and sets the last-activity string the card/list show so concurrent tasks stay distinguishable (#124/#127).
- #remove(id) ⇒ Object
-
#request_stop(id) ⇒ Object
Marks a stop REQUEST (the /agents <id> –stop / task_stop path) on a live entry so the list/cards immediately show ◌ stopping instead of a stale ● running while the child unwinds at its next checkpoint (#108).
-
#reserve(subagent:, prompt:, owner_subagent_id: nil, depth: 0) ⇒ Object
Reserves a slot and registers a ‘running` entry, returning it.
-
#running ⇒ Object
Live (still-running) children — used by the parent stop path to cancel orphans, and to enforce the concurrency cap.
-
#steer(id, text) ⇒ Object
Records a parent->child steer note (the ‘/agents <id> steer "…"` affordance).
Constructor Details
#initialize ⇒ BackgroundTasks
Returns a new instance of BackgroundTasks.
108 109 110 111 |
# File 'lib/rubino/tools/background_tasks.rb', line 108 def initialize @entries = {} @mutex = Mutex.new end |
Instance Attribute Details
#last_refusal_reason ⇒ Object (readonly)
Why the most recent #reserve returned nil (one of :depth / :per_owner / :global), or nil when the last reserve succeeded. Read by TaskTool to phrase a reason-specific at-capacity message.
160 161 162 |
# File 'lib/rubino/tools/background_tasks.rb', line 160 def last_refusal_reason @last_refusal_reason end |
Class Method Details
.instance ⇒ Object
98 99 100 |
# File 'lib/rubino/tools/background_tasks.rb', line 98 def instance @instance ||= new end |
.reset! ⇒ Object
Test seam: drop all state between examples.
103 104 105 |
# File 'lib/rubino/tools/background_tasks.rb', line 103 def reset! @instance = nil end |
Instance Method Details
#ancestors_of(id) ⇒ Object
The chain of ancestors of ‘id`, nearest parent first, walking owner_subagent_id up to the human/top-level root. Cycle-safe.
429 430 431 432 433 434 435 436 437 438 439 440 441 |
# File 'lib/rubino/tools/background_tasks.rb', line 429 def ancestors_of(id) @mutex.synchronize do out = [] seen = { id => true } cur = @entries[id]&.owner_subagent_id while cur && (entry = @entries[cur]) && !seen[cur] seen[cur] = true out << entry cur = entry.owner_subagent_id end out end end |
#attach(entry, thread:, runner:) ⇒ Object
Binds the live worker thread + child runner to a reserved entry so the registry can later cancel it. Done after reserve so the entry exists in the map before the thread starts (no race on completion writing back).
165 166 167 168 169 170 |
# File 'lib/rubino/tools/background_tasks.rb', line 165 def attach(entry, thread:, runner:) @mutex.synchronize do entry.thread = thread entry.runner = runner end end |
#awaiting_approval ⇒ Object
Entries currently parked on a human approval — surfaced on their card and answerable via /agents <id>.
372 373 374 |
# File 'lib/rubino/tools/background_tasks.rb', line 372 def awaiting_approval @mutex.synchronize { @entries.values.select { |e| e.status == :needs_approval } } end |
#awaiting_human ⇒ Object
Entries parked on an escalated ask_parent, waiting on THE HUMAN — the source of the persistent "u26d4 N subagent waiting on you" marker and answerable via /reply <id>. Counts ONLY :blocked_on_human: a :blocked_on_parent child is its agent-parent’s job (answer_child), not the human’s, so it must NOT inflate the human’s “waiting on you” count.
366 367 368 |
# File 'lib/rubino/tools/background_tasks.rb', line 366 def awaiting_human @mutex.synchronize { @entries.values.select { |e| e.status == :blocked_on_human } } end |
#begin_approval(id, gate:, approval_id:, question:, command:) ⇒ Object
Flips an entry into the :needs_approval state and stores the gate + question/command the card surfaces (Option 2). The child thread then parks on ‘gate.await(approval_id)`; the user resolves it via /agents <id>. Returns the previous status so the child can restore it.
238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/rubino/tools/background_tasks.rb', line 238 def begin_approval(id, gate:, approval_id:, question:, command:) @mutex.synchronize do entry = @entries[id] return unless entry entry.approval_gate = gate entry.approval_id = approval_id entry.approval_question = question.to_s entry.approval_command = command.to_s entry.status = :needs_approval end end |
#begin_ask(id, gate:, ask_id:, question:, blocking:, owner_id: nil) ⇒ Object
Flips an entry into the :blocked_on_human state for an escalated ask_parent: stores the gate + question + blocking flag the card/banner surface (mirror of #begin_approval, but for a child->parent question that the parent couldn’t answer and escalated to the human). The child thread then parks on ‘ask_gate.await(ask_id)` (blocking ask) until /reply <id> decides the gate, or keeps working (non-blocking ask) with the answer delivered later via the steer queue. A child in this state still holds a concurrency slot (its thread is alive, or it is awaiting the human), so it counts as live. The status depends on WHO owns the asking child (S4): owner_id present (an agent-parent) → :blocked_on_parent (the parent MODEL answers via answer_child; the question was pushed onto the owner’s steer_queue, NOT the human’s job); owner_id nil (the human / top-level) → :blocked_on_human (the human answers via /reply <id>).
313 314 315 316 317 318 319 320 321 322 323 324 |
# File 'lib/rubino/tools/background_tasks.rb', line 313 def begin_ask(id, gate:, ask_id:, question:, blocking:, owner_id: nil) @mutex.synchronize do entry = @entries[id] return unless entry entry.ask_gate = gate entry.ask_id = ask_id entry.ask_question = question.to_s entry.ask_blocking = blocking ? true : false entry.status = owner_id ? :blocked_on_parent : :blocked_on_human end end |
#cancel_descendant_ask_gates(id) ⇒ Object
Stop-cascade (S5a): when a node is stopped, cancel the ask-gates of ALL its descendants so a blocking ask anywhere in the subtree unwinds at once (Run::ApprovalGate#cancel! wakes the parked child thread with Interrupted) instead of leaving an orphaned grandchild parked until its bound elapses. The descendant runners’ CancelTokens are flipped by the caller’s cancel! of the node; this just makes the gate-parked ones wake immediately. Safe to call on a node with no descendants or no blocked descendants.
450 451 452 |
# File 'lib/rubino/tools/background_tasks.rb', line 450 def cancel_descendant_ask_gates(id) descendants_of(id).each { |e| e.ask_gate&.cancel! } end |
#children_of(id) ⇒ Object
Direct children of ‘id`: entries whose owner_subagent_id == id. Pass nil for the human/top-level node’s direct children.
401 402 403 |
# File 'lib/rubino/tools/background_tasks.rb', line 401 def children_of(id) @mutex.synchronize { @entries.values.select { |e| e.owner_subagent_id == id } } end |
#complete(entry, status:, result: nil, error: nil) ⇒ Object
Records terminal state when the worker finishes (called from its ‘ensure`). Single writer per entry, but guarded so #find/#list readers see a consistent snapshot. A failure landing on a :stopping entry is a USER-REQUESTED stop unwinding (Interrupted at the next checkpoint), so it is recorded as :stopped — distinct from a genuine :failed (#108/#13).
193 194 195 196 197 198 199 200 201 |
# File 'lib/rubino/tools/background_tasks.rb', line 193 def complete(entry, status:, result: nil, error: nil) @mutex.synchronize do status = :stopped if entry.status == :stopping && status == :failed entry.status = status entry.result = result entry.error = error entry.finished_at = Time.now end end |
#deliver_answer(id, answer) ⇒ Object
The ONE shared answer wire for an escalated ask_parent, used by BOTH the human /reply path (Commands::Executor#deliver_reply) and the model-callable ‘answer_child` tool: route the answer back DOWN to the asking child by (1) deciding its ask gate — unblocks a BLOCKING ask with the answer as its tool result — and (2) pushing the answer onto its steer queue so a NON-BLOCKING ask folds it in at its next turn boundary; then clear the blocked state (#end_ask). Either way the answer PERSISTS in the child’s context. No-op (returns false) for an unknown id or one not awaiting an answer (no ask_gate); true when the answer was routed.
351 352 353 354 355 356 357 358 359 |
# File 'lib/rubino/tools/background_tasks.rb', line 351 def deliver_answer(id, answer) entry = find(id) return false unless entry&.ask_gate entry.ask_gate.decide(entry.ask_id, answer) steer(entry.id, "[parent answer] #{answer}") end_ask(entry.id) true end |
#descendants_of(id) ⇒ Object
All transitive descendants of ‘id` (BFS over owner_subagent_id), in breadth order. Cycle-safe (an id is visited at most once).
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 |
# File 'lib/rubino/tools/background_tasks.rb', line 407 def descendants_of(id) @mutex.synchronize do out = [] seen = {} frontier = @entries.values.select { |e| e.owner_subagent_id == id } until frontier.empty? nxt = [] frontier.each do |e| next if seen[e.id] seen[e.id] = true out << e nxt.concat(@entries.values.select { |c| c.owner_subagent_id == e.id }) end frontier = nxt end out end end |
#end_approval(id) ⇒ Object
Clears the approval state and returns the entry to :running once a decision has been delivered (or the child unwinds).
253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/rubino/tools/background_tasks.rb', line 253 def end_approval(id) @mutex.synchronize do entry = @entries[id] return unless entry entry.approval_gate = nil entry.approval_id = nil entry.approval_question = nil entry.approval_command = nil entry.status = :running if entry.status == :needs_approval end end |
#end_ask(id) ⇒ Object
Clears the ask state and returns the entry to :running once the question has been answered (by the human via /reply, or the agent-parent via answer_child), or the child unwinds / is stopped.
329 330 331 332 333 334 335 336 337 338 339 340 |
# File 'lib/rubino/tools/background_tasks.rb', line 329 def end_ask(id) @mutex.synchronize do entry = @entries[id] return unless entry entry.ask_gate = nil entry.ask_id = nil entry.ask_question = nil entry.ask_blocking = nil entry.status = :running if %i[blocked_on_human blocked_on_parent].include?(entry.status) end end |
#find(id) ⇒ Object
376 377 378 |
# File 'lib/rubino/tools/background_tasks.rb', line 376 def find(id) @mutex.synchronize { @entries[id] } end |
#list ⇒ Object
All entries, newest first — for a ‘task` listing (the /tasks analogue).
381 382 383 |
# File 'lib/rubino/tools/background_tasks.rb', line 381 def list @mutex.synchronize { @entries.values.sort_by(&:started_at).reverse } end |
#owned_by?(parent_id, child_id) ⇒ Boolean
True iff ‘child_id`’s direct owner is ‘parent_id` (the ownership predicate later slices’ steer/probe/answer_child AUTHORIZATION checks will build on).
456 457 458 459 460 461 |
# File 'lib/rubino/tools/background_tasks.rb', line 456 def owned_by?(parent_id, child_id) @mutex.synchronize do child = @entries[child_id] !child.nil? && child.owner_subagent_id == parent_id end end |
#record_live_probe(id) ⇒ Object
Records a BILLED live probe against a child (S3): bumps probe_count and stamps last_probe_at, under the mutex (the owner runs this on its own thread while the parent renderer may read the entry). Returns the new count, or nil for an unknown id. Free snapshot probes (live:false) never call this — only ‘probe(live:true)` does, after the budget check passes.
288 289 290 291 292 293 294 295 296 297 |
# File 'lib/rubino/tools/background_tasks.rb', line 288 def record_live_probe(id) @mutex.synchronize do entry = @entries[id] return nil unless entry entry.probe_count = entry.probe_count.to_i + 1 entry.last_probe_at = Time.now entry.probe_count end end |
#record_tool_finished(id, line) ⇒ Object
Records a child tool FINISHING: appends a terse line to the bounded activity ring the live drill-in (#71) tails. Keeps the last ACTIVITY_LOG_MAX entries so the ring never grows unbounded for a read-heavy child.
223 224 225 226 227 228 229 230 231 232 |
# File 'lib/rubino/tools/background_tasks.rb', line 223 def record_tool_finished(id, line) @mutex.synchronize do entry = @entries[id] return unless entry log = (entry.activity_log ||= []) log << line.to_s log.shift while log.size > ACTIVITY_LOG_MAX end end |
#record_tool_started(id, activity) ⇒ Object
Records a child tool STARTING: bumps the tool counter and sets the last-activity string the card/list show so concurrent tasks stay distinguishable (#124/#127). Called from UI::SubagentView#tool_started, which runs on the CHILD thread, so it MUST take the mutex (the parent renderer reads these fields concurrently). No-op for an unknown id (a late event after #remove).
209 210 211 212 213 214 215 216 217 |
# File 'lib/rubino/tools/background_tasks.rb', line 209 def record_tool_started(id, activity) @mutex.synchronize do entry = @entries[id] return unless entry entry.tool_count = entry.tool_count.to_i + 1 entry.last_activity = activity.to_s end end |
#remove(id) ⇒ Object
393 394 395 |
# File 'lib/rubino/tools/background_tasks.rb', line 393 def remove(id) @mutex.synchronize { @entries.delete(id) } end |
#request_stop(id) ⇒ Object
Marks a stop REQUEST (the /agents <id> –stop / task_stop path) on a live entry so the list/cards immediately show ◌ stopping instead of a stale ● running while the child unwinds at its next checkpoint (#108). Returns true when the entry flipped. #complete then maps a failure on a :stopping entry to the terminal :stopped, so a deliberate stop never reads as ✗ failed (#13).
178 179 180 181 182 183 184 185 186 |
# File 'lib/rubino/tools/background_tasks.rb', line 178 def request_stop(id) @mutex.synchronize do entry = @entries[id] return false unless entry && live_status?(entry.status) entry.status = :stopping true end end |
#reserve(subagent:, prompt:, owner_subagent_id: nil, depth: 0) ⇒ Object
Reserves a slot and registers a ‘running` entry, returning it. The caller then attaches the worker thread + runner via #attach.
owner_subagent_id is the ‘sa_*` id of the SPAWNING subagent (nil ⇒ the human / top-level agent spawned this child). depth is the caller’s hint for a human-spawned child (0); for an owner-spawned child the depth is recomputed here from the owner entry (owner.depth + 1) so a stale hint can’t smuggle a child past the depth cap.
Returns nil — so TaskTool can surface a clear message instead of spawning unbounded work — when ANY of the three nesting caps is hit. The reason is available via #last_refusal_reason for the caller to phrase the message:
:depth — depth >= max_depth (no deeper nesting allowed)
:per_owner — this owner already has max_children_per_node live kids
:global — total live subagents across the tree >= max total
This is the SINGLE enforcement point for every nesting limit.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/rubino/tools/background_tasks.rb', line 129 def reserve(subagent:, prompt:, owner_subagent_id: nil, depth: 0) @mutex.synchronize do owner = owner_subagent_id ? @entries[owner_subagent_id] : nil effective_depth = owner ? owner.depth.to_i + 1 : depth.to_i @last_refusal_reason = refusal_reason(owner_subagent_id, effective_depth) return nil if @last_refusal_reason entry = Entry.new( id: new_id, subagent: subagent.to_s, prompt: prompt.to_s, status: :running, started_at: Time.now, tool_count: 0, activity_log: [], # Every background child gets its OWN steering queue at reserve time # so the parent can `/agents <id> steer "..."` it the instant it is # listed — no separate wiring step, no nil window. steer_queue: Interaction::InputQueue.new, owner_subagent_id: owner_subagent_id, depth: effective_depth ) @entries[entry.id] = entry entry end end |
#running ⇒ Object
Live (still-running) children — used by the parent stop path to cancel orphans, and to enforce the concurrency cap. A child parked on a human approval (:needs_approval) is STILL live (its thread is alive, holding a slot), so it counts as running here.
389 390 391 |
# File 'lib/rubino/tools/background_tasks.rb', line 389 def running @mutex.synchronize { @entries.values.select { |e| live_status?(e.status) } } end |
#steer(id, text) ⇒ Object
Records a parent->child steer note (the ‘/agents <id> steer "…"` affordance). Pushes the text onto the child’s steering queue, which the child Loop drains at its next iteration boundary (Loop#inject_steered_input) — between turns, never between a tool_use and its results. Best-effort: returns false (and pushes nothing) when the entry is gone or has no queue (e.g. a finished child), true when the note was queued.
272 273 274 275 276 277 278 279 280 281 |
# File 'lib/rubino/tools/background_tasks.rb', line 272 def steer(id, text) queue = @mutex.synchronize do entry = @entries[id] entry&.steer_queue end return false unless queue queue.push(text) true end |