Class: Rubino::Tools::BackgroundTasks

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/tools/background_tasks.rb

Overview

Process-wide registry for subagents started by the ‘task` tool in the BACKGROUND (the default). Mirrors ShellRegistry — the in-repo precedent for “fire-and-forget + poll later + kill” — but the unit of work is a nested Agent::Runner thread instead of a detached OS process.

Each entry owns:

- the worker Thread running the child Runner#run!,
- the child Runner (so #cancel can flip its CancelToken — exactly the
  mechanism Run::Executor's stop-watcher uses for top-level runs),
- the terminal status/result/error captured in the worker's `ensure`.

The registry survives a single CLI/server process — like ShellRegistry it is intentionally NOT persisted. Background subagents die with the process.

Concurrency cap (mirrors the reference _DEFAULT_MAX_CONCURRENT_CHILDREN = 3): a background subagent is a full LLM run = real cost, so #spawn refuses past MAX_CONCURRENT live children rather than fanning out unbounded threads.

Defined Under Namespace

Classes: Entry

Constant Summary collapse

MAX_CONCURRENT =
3
MAX_DEPTH =

Fallback caps for the nested-subagent tree, used when config is absent (e.g. a bare registry in a unit test with no Configuration wired). The live values come from config (tasks.max_depth / max_children_per_node / max_concurrent_total); these constants are the built-in defaults the config keys themselves default to. All three are enforced in #reserve.

2
MAX_CHILDREN_PER_NODE =
3
MAX_CONCURRENT_TOTAL =
8
ACTIVITY_LOG_MAX =

How many recent activity lines the drill-in shows (the live ‘recent:` ring).

6

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBackgroundTasks

Returns a new instance of BackgroundTasks.



108
109
110
111
# File 'lib/rubino/tools/background_tasks.rb', line 108

def initialize
  @entries = {}
  @mutex   = Mutex.new
end

Instance Attribute Details

#last_refusal_reasonObject (readonly)

Why the most recent #reserve returned nil (one of :depth / :per_owner / :global), or nil when the last reserve succeeded. Read by TaskTool to phrase a reason-specific at-capacity message.



160
161
162
# File 'lib/rubino/tools/background_tasks.rb', line 160

def last_refusal_reason
  @last_refusal_reason
end

Class Method Details

.instanceObject



98
99
100
# File 'lib/rubino/tools/background_tasks.rb', line 98

def instance
  @instance ||= new
end

.reset!Object

Test seam: drop all state between examples.



103
104
105
# File 'lib/rubino/tools/background_tasks.rb', line 103

def reset!
  @instance = nil
end

Instance Method Details

#ancestors_of(id) ⇒ Object

The chain of ancestors of ‘id`, nearest parent first, walking owner_subagent_id up to the human/top-level root. Cycle-safe.



429
430
431
432
433
434
435
436
437
438
439
440
441
# File 'lib/rubino/tools/background_tasks.rb', line 429

def ancestors_of(id)
  @mutex.synchronize do
    out  = []
    seen = { id => true }
    cur  = @entries[id]&.owner_subagent_id
    while cur && (entry = @entries[cur]) && !seen[cur]
      seen[cur] = true
      out << entry
      cur = entry.owner_subagent_id
    end
    out
  end
end

#attach(entry, thread:, runner:) ⇒ Object

Binds the live worker thread + child runner to a reserved entry so the registry can later cancel it. Done after reserve so the entry exists in the map before the thread starts (no race on completion writing back).



165
166
167
168
169
170
# File 'lib/rubino/tools/background_tasks.rb', line 165

def attach(entry, thread:, runner:)
  @mutex.synchronize do
    entry.thread = thread
    entry.runner = runner
  end
end

#awaiting_approvalObject

Entries currently parked on a human approval — surfaced on their card and answerable via /agents <id>.



372
373
374
# File 'lib/rubino/tools/background_tasks.rb', line 372

def awaiting_approval
  @mutex.synchronize { @entries.values.select { |e| e.status == :needs_approval } }
end

#awaiting_humanObject

Entries parked on an escalated ask_parent, waiting on THE HUMAN — the source of the persistent "u26d4 N subagent waiting on you" marker and answerable via /reply <id>. Counts ONLY :blocked_on_human: a :blocked_on_parent child is its agent-parent’s job (answer_child), not the human’s, so it must NOT inflate the human’s “waiting on you” count.



366
367
368
# File 'lib/rubino/tools/background_tasks.rb', line 366

def awaiting_human
  @mutex.synchronize { @entries.values.select { |e| e.status == :blocked_on_human } }
end

#begin_approval(id, gate:, approval_id:, question:, command:) ⇒ Object

Flips an entry into the :needs_approval state and stores the gate + question/command the card surfaces (Option 2). The child thread then parks on ‘gate.await(approval_id)`; the user resolves it via /agents <id>. Returns the previous status so the child can restore it.



238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/rubino/tools/background_tasks.rb', line 238

def begin_approval(id, gate:, approval_id:, question:, command:)
  @mutex.synchronize do
    entry = @entries[id]
    return unless entry

    entry.approval_gate     = gate
    entry.approval_id       = approval_id
    entry.approval_question = question.to_s
    entry.approval_command  = command.to_s
    entry.status            = :needs_approval
  end
end

#begin_ask(id, gate:, ask_id:, question:, blocking:, owner_id: nil) ⇒ Object

Flips an entry into the :blocked_on_human state for an escalated ask_parent: stores the gate + question + blocking flag the card/banner surface (mirror of #begin_approval, but for a child->parent question that the parent couldn’t answer and escalated to the human). The child thread then parks on ‘ask_gate.await(ask_id)` (blocking ask) until /reply <id> decides the gate, or keeps working (non-blocking ask) with the answer delivered later via the steer queue. A child in this state still holds a concurrency slot (its thread is alive, or it is awaiting the human), so it counts as live. The status depends on WHO owns the asking child (S4): owner_id present (an agent-parent) → :blocked_on_parent (the parent MODEL answers via answer_child; the question was pushed onto the owner’s steer_queue, NOT the human’s job); owner_id nil (the human / top-level) → :blocked_on_human (the human answers via /reply <id>).



313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/rubino/tools/background_tasks.rb', line 313

def begin_ask(id, gate:, ask_id:, question:, blocking:, owner_id: nil)
  @mutex.synchronize do
    entry = @entries[id]
    return unless entry

    entry.ask_gate     = gate
    entry.ask_id       = ask_id
    entry.ask_question = question.to_s
    entry.ask_blocking = blocking ? true : false
    entry.status       = owner_id ? :blocked_on_parent : :blocked_on_human
  end
end

#cancel_descendant_ask_gates(id) ⇒ Object

Stop-cascade (S5a): when a node is stopped, cancel the ask-gates of ALL its descendants so a blocking ask anywhere in the subtree unwinds at once (Run::ApprovalGate#cancel! wakes the parked child thread with Interrupted) instead of leaving an orphaned grandchild parked until its bound elapses. The descendant runners’ CancelTokens are flipped by the caller’s cancel! of the node; this just makes the gate-parked ones wake immediately. Safe to call on a node with no descendants or no blocked descendants.



450
451
452
# File 'lib/rubino/tools/background_tasks.rb', line 450

def cancel_descendant_ask_gates(id)
  descendants_of(id).each { |e| e.ask_gate&.cancel! }
end

#children_of(id) ⇒ Object

Direct children of ‘id`: entries whose owner_subagent_id == id. Pass nil for the human/top-level node’s direct children.



401
402
403
# File 'lib/rubino/tools/background_tasks.rb', line 401

def children_of(id)
  @mutex.synchronize { @entries.values.select { |e| e.owner_subagent_id == id } }
end

#complete(entry, status:, result: nil, error: nil) ⇒ Object

Records terminal state when the worker finishes (called from its ‘ensure`). Single writer per entry, but guarded so #find/#list readers see a consistent snapshot. A failure landing on a :stopping entry is a USER-REQUESTED stop unwinding (Interrupted at the next checkpoint), so it is recorded as :stopped — distinct from a genuine :failed (#108/#13).



193
194
195
196
197
198
199
200
201
# File 'lib/rubino/tools/background_tasks.rb', line 193

def complete(entry, status:, result: nil, error: nil)
  @mutex.synchronize do
    status            = :stopped if entry.status == :stopping && status == :failed
    entry.status      = status
    entry.result      = result
    entry.error       = error
    entry.finished_at = Time.now
  end
end

#deliver_answer(id, answer) ⇒ Object

The ONE shared answer wire for an escalated ask_parent, used by BOTH the human /reply path (Commands::Executor#deliver_reply) and the model-callable ‘answer_child` tool: route the answer back DOWN to the asking child by (1) deciding its ask gate — unblocks a BLOCKING ask with the answer as its tool result — and (2) pushing the answer onto its steer queue so a NON-BLOCKING ask folds it in at its next turn boundary; then clear the blocked state (#end_ask). Either way the answer PERSISTS in the child’s context. No-op (returns false) for an unknown id or one not awaiting an answer (no ask_gate); true when the answer was routed.



351
352
353
354
355
356
357
358
359
# File 'lib/rubino/tools/background_tasks.rb', line 351

def deliver_answer(id, answer)
  entry = find(id)
  return false unless entry&.ask_gate

  entry.ask_gate.decide(entry.ask_id, answer)
  steer(entry.id, "[parent answer] #{answer}")
  end_ask(entry.id)
  true
end

#descendants_of(id) ⇒ Object

All transitive descendants of ‘id` (BFS over owner_subagent_id), in breadth order. Cycle-safe (an id is visited at most once).



407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
# File 'lib/rubino/tools/background_tasks.rb', line 407

def descendants_of(id)
  @mutex.synchronize do
    out     = []
    seen    = {}
    frontier = @entries.values.select { |e| e.owner_subagent_id == id }
    until frontier.empty?
      nxt = []
      frontier.each do |e|
        next if seen[e.id]

        seen[e.id] = true
        out << e
        nxt.concat(@entries.values.select { |c| c.owner_subagent_id == e.id })
      end
      frontier = nxt
    end
    out
  end
end

#end_approval(id) ⇒ Object

Clears the approval state and returns the entry to :running once a decision has been delivered (or the child unwinds).



253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/rubino/tools/background_tasks.rb', line 253

def end_approval(id)
  @mutex.synchronize do
    entry = @entries[id]
    return unless entry

    entry.approval_gate     = nil
    entry.approval_id       = nil
    entry.approval_question = nil
    entry.approval_command  = nil
    entry.status            = :running if entry.status == :needs_approval
  end
end

#end_ask(id) ⇒ Object

Clears the ask state and returns the entry to :running once the question has been answered (by the human via /reply, or the agent-parent via answer_child), or the child unwinds / is stopped.



329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/rubino/tools/background_tasks.rb', line 329

def end_ask(id)
  @mutex.synchronize do
    entry = @entries[id]
    return unless entry

    entry.ask_gate     = nil
    entry.ask_id       = nil
    entry.ask_question = nil
    entry.ask_blocking = nil
    entry.status       = :running if %i[blocked_on_human blocked_on_parent].include?(entry.status)
  end
end

#find(id) ⇒ Object



376
377
378
# File 'lib/rubino/tools/background_tasks.rb', line 376

def find(id)
  @mutex.synchronize { @entries[id] }
end

#listObject

All entries, newest first — for a ‘task` listing (the /tasks analogue).



381
382
383
# File 'lib/rubino/tools/background_tasks.rb', line 381

def list
  @mutex.synchronize { @entries.values.sort_by(&:started_at).reverse }
end

#owned_by?(parent_id, child_id) ⇒ Boolean

True iff ‘child_id`’s direct owner is ‘parent_id` (the ownership predicate later slices’ steer/probe/answer_child AUTHORIZATION checks will build on).

Returns:

  • (Boolean)


456
457
458
459
460
461
# File 'lib/rubino/tools/background_tasks.rb', line 456

def owned_by?(parent_id, child_id)
  @mutex.synchronize do
    child = @entries[child_id]
    !child.nil? && child.owner_subagent_id == parent_id
  end
end

#record_live_probe(id) ⇒ Object

Records a BILLED live probe against a child (S3): bumps probe_count and stamps last_probe_at, under the mutex (the owner runs this on its own thread while the parent renderer may read the entry). Returns the new count, or nil for an unknown id. Free snapshot probes (live:false) never call this — only ‘probe(live:true)` does, after the budget check passes.



288
289
290
291
292
293
294
295
296
297
# File 'lib/rubino/tools/background_tasks.rb', line 288

def record_live_probe(id)
  @mutex.synchronize do
    entry = @entries[id]
    return nil unless entry

    entry.probe_count   = entry.probe_count.to_i + 1
    entry.last_probe_at = Time.now
    entry.probe_count
  end
end

#record_tool_finished(id, line) ⇒ Object

Records a child tool FINISHING: appends a terse line to the bounded activity ring the live drill-in (#71) tails. Keeps the last ACTIVITY_LOG_MAX entries so the ring never grows unbounded for a read-heavy child.



223
224
225
226
227
228
229
230
231
232
# File 'lib/rubino/tools/background_tasks.rb', line 223

def record_tool_finished(id, line)
  @mutex.synchronize do
    entry = @entries[id]
    return unless entry

    log = (entry.activity_log ||= [])
    log << line.to_s
    log.shift while log.size > ACTIVITY_LOG_MAX
  end
end

#record_tool_started(id, activity) ⇒ Object

Records a child tool STARTING: bumps the tool counter and sets the last-activity string the card/list show so concurrent tasks stay distinguishable (#124/#127). Called from UI::SubagentView#tool_started, which runs on the CHILD thread, so it MUST take the mutex (the parent renderer reads these fields concurrently). No-op for an unknown id (a late event after #remove).



209
210
211
212
213
214
215
216
217
# File 'lib/rubino/tools/background_tasks.rb', line 209

def record_tool_started(id, activity)
  @mutex.synchronize do
    entry = @entries[id]
    return unless entry

    entry.tool_count = entry.tool_count.to_i + 1
    entry.last_activity = activity.to_s
  end
end

#remove(id) ⇒ Object



393
394
395
# File 'lib/rubino/tools/background_tasks.rb', line 393

def remove(id)
  @mutex.synchronize { @entries.delete(id) }
end

#request_stop(id) ⇒ Object

Marks a stop REQUEST (the /agents <id> –stop / task_stop path) on a live entry so the list/cards immediately show ◌ stopping instead of a stale ● running while the child unwinds at its next checkpoint (#108). Returns true when the entry flipped. #complete then maps a failure on a :stopping entry to the terminal :stopped, so a deliberate stop never reads as ✗ failed (#13).



178
179
180
181
182
183
184
185
186
# File 'lib/rubino/tools/background_tasks.rb', line 178

def request_stop(id)
  @mutex.synchronize do
    entry = @entries[id]
    return false unless entry && live_status?(entry.status)

    entry.status = :stopping
    true
  end
end

#reserve(subagent:, prompt:, owner_subagent_id: nil, depth: 0) ⇒ Object

Reserves a slot and registers a ‘running` entry, returning it. The caller then attaches the worker thread + runner via #attach.

owner_subagent_id is the ‘sa_*` id of the SPAWNING subagent (nil ⇒ the human / top-level agent spawned this child). depth is the caller’s hint for a human-spawned child (0); for an owner-spawned child the depth is recomputed here from the owner entry (owner.depth + 1) so a stale hint can’t smuggle a child past the depth cap.

Returns nil — so TaskTool can surface a clear message instead of spawning unbounded work — when ANY of the three nesting caps is hit. The reason is available via #last_refusal_reason for the caller to phrase the message:

:depth          — depth >= max_depth (no deeper nesting allowed)
:per_owner      — this owner already has max_children_per_node live kids
:global         — total live subagents across the tree >= max total

This is the SINGLE enforcement point for every nesting limit.



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/rubino/tools/background_tasks.rb', line 129

def reserve(subagent:, prompt:, owner_subagent_id: nil, depth: 0)
  @mutex.synchronize do
    owner = owner_subagent_id ? @entries[owner_subagent_id] : nil
    effective_depth = owner ? owner.depth.to_i + 1 : depth.to_i

    @last_refusal_reason = refusal_reason(owner_subagent_id, effective_depth)
    return nil if @last_refusal_reason

    entry = Entry.new(
      id: new_id,
      subagent: subagent.to_s,
      prompt: prompt.to_s,
      status: :running,
      started_at: Time.now,
      tool_count: 0,
      activity_log: [],
      # Every background child gets its OWN steering queue at reserve time
      # so the parent can `/agents <id> steer "..."` it the instant it is
      # listed — no separate wiring step, no nil window.
      steer_queue: Interaction::InputQueue.new,
      owner_subagent_id: owner_subagent_id,
      depth: effective_depth
    )
    @entries[entry.id] = entry
    entry
  end
end

#runningObject

Live (still-running) children — used by the parent stop path to cancel orphans, and to enforce the concurrency cap. A child parked on a human approval (:needs_approval) is STILL live (its thread is alive, holding a slot), so it counts as running here.



389
390
391
# File 'lib/rubino/tools/background_tasks.rb', line 389

def running
  @mutex.synchronize { @entries.values.select { |e| live_status?(e.status) } }
end

#steer(id, text) ⇒ Object

Records a parent->child steer note (the ‘/agents <id> steer "…"` affordance). Pushes the text onto the child’s steering queue, which the child Loop drains at its next iteration boundary (Loop#inject_steered_input) — between turns, never between a tool_use and its results. Best-effort: returns false (and pushes nothing) when the entry is gone or has no queue (e.g. a finished child), true when the note was queued.



272
273
274
275
276
277
278
279
280
281
# File 'lib/rubino/tools/background_tasks.rb', line 272

def steer(id, text)
  queue = @mutex.synchronize do
    entry = @entries[id]
    entry&.steer_queue
  end
  return false unless queue

  queue.push(text)
  true
end