Class: Rubino::Tools::BackgroundTasks

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/tools/background_tasks.rb

Overview

Process-wide registry for subagents started by the ‘task` tool in the BACKGROUND (the default). Mirrors ShellRegistry — the in-repo precedent for “fire-and-forget + poll later + kill” — but the unit of work is a nested Agent::Runner thread instead of a detached OS process.

Each entry owns:

- the worker Thread running the child Runner#run!,
- the child Runner (so #cancel can flip its CancelToken — exactly the
  mechanism Run::Executor's stop-watcher uses for top-level runs),
- the terminal status/result/error captured in the worker's `ensure`.

The registry survives a single CLI/server process — like ShellRegistry it is intentionally NOT persisted. Background subagents die with the process.

Concurrency cap (mirrors the reference _DEFAULT_MAX_CONCURRENT_CHILDREN = 3): a background subagent is a full LLM run = real cost, so #spawn refuses past MAX_CONCURRENT live children rather than fanning out unbounded threads.

Defined Under Namespace

Classes: Entry

Constant Summary collapse

MAX_CONCURRENT =
3
MAX_DEPTH =

Fallback caps for the nested-subagent tree, used when config is absent (e.g. a bare registry in a unit test with no Configuration wired). The live values come from config (tasks.max_depth / max_children_per_node / max_concurrent_total); these constants are the built-in defaults the config keys themselves default to. All three are enforced in #reserve.

2
MAX_CHILDREN_PER_NODE =
3
MAX_CONCURRENT_TOTAL =
8
ACTIVITY_LOG_MAX =

How many recent activity lines the drill-in shows (the live ‘recent:` ring).

6
OUTPUT_TAIL_MAX =

Bounds for the live output tail (#5): how many COMPLETE lines the drill-in’s output: block shows (the buffer keeps one extra slot for the in-flight partial line), and the byte cap per buffered line so a newline-free stream can’t grow a line unbounded.

6
OUTPUT_TAIL_LINE_MAX =
200
DENY_NOTE_PREFIX =

Prefix the human’s “deny & tell the agent why” reason carries when handed to the child as a steer note (#Y1B). The note is ADVISORY — the approval gate is already denied regardless — so when the child finishes before folding it in, the still-queued copy drained by #complete must NOT raise the scary “steer note not delivered (task completed first)” alarm: the denial applied correctly and the explanation is moot. The completion paths filter this prefix out of the undelivered WARNING (a calm note instead). A genuine ‘/agents <id> steer` note never carries it, so its deliver-or-report invariant is intact.

"[approval denied by human] "
LIVE_STATUSES =

The statuses under which a child still holds a concurrency slot: its worker thread is alive — actively running, parked on a human approval, or unwinding after a stop request. This is the SINGLE source of truth for “is this child still alive?”, shared by the registry itself (#running / #reserve cap) AND by every UI surface that lists live children (the footer cards, the attached switcher, the navigable picker) so they can never drift apart and silently drop a live-but-quiet child from one surface while another still shows it (R1). Any new parked state added to the lifecycle is made visible everywhere by editing this one set.

%i[running needs_approval stopping].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBackgroundTasks

Returns a new instance of BackgroundTasks.



154
155
156
157
158
159
160
161
162
# File 'lib/rubino/tools/background_tasks.rb', line 154

def initialize
  @entries = {}
  @mutex   = Mutex.new
  # Monotonic source for approval_seq — the FIFO order of the approval
  # modal queue. Bumped under @mutex on every begin_approval so two
  # children that park "at once" still get a deterministic, stable order
  # (the one whose begin_approval won the lock first is the head).
  @approval_seq = 0
end

Instance Attribute Details

#last_refusal_reasonObject (readonly)

Why the most recent #reserve returned nil (one of :depth / :per_owner / :global), or nil when the last reserve succeeded. Read by TaskTool to phrase a reason-specific at-capacity message.



211
212
213
# File 'lib/rubino/tools/background_tasks.rb', line 211

def last_refusal_reason
  @last_refusal_reason
end

Class Method Details

.instanceObject



136
137
138
# File 'lib/rubino/tools/background_tasks.rb', line 136

def instance
  @instance ||= new
end

.live_status?(status) ⇒ Boolean

The shared liveness oracle (see LIVE_STATUSES). Public so the UI surfaces that format a registry snapshot (SubagentCards, AgentMenu) filter by the EXACT same rule the registry uses, with no duplicated status list to fall out of sync.

Returns:

  • (Boolean)


149
150
151
# File 'lib/rubino/tools/background_tasks.rb', line 149

def live_status?(status)
  LIVE_STATUSES.include?(status)
end

.reset!Object

Test seam: drop all state between examples.



141
142
143
# File 'lib/rubino/tools/background_tasks.rb', line 141

def reset!
  @instance = nil
end

Instance Method Details

#attach(entry, thread:, runner:) ⇒ Object

Binds the live worker thread + child runner to a reserved entry so the registry can later cancel it. Done after reserve so the entry exists in the map before the thread starts (no race on completion writing back).



216
217
218
219
220
221
# File 'lib/rubino/tools/background_tasks.rb', line 216

def attach(entry, thread:, runner:)
  @mutex.synchronize do
    entry.thread = thread
    entry.runner = runner
  end
end

#awaiting_approvalObject

Entries currently parked on a human approval — surfaced on their card and answerable via /agents <id>. Ordered OLDEST-FIRST (by the moment the child blocked, approval_seq) so the modal queue is FIFO: when two children raise an approval at once only ONE modal is presented at a time (the head of this list — auto_resolve_pending takes #first), the rest are the “(N more queued)” backlog shown on the active modal, and they dequeue in the order they parked. Ties fall back to started_at for a stable order.



409
410
411
412
413
414
# File 'lib/rubino/tools/background_tasks.rb', line 409

def awaiting_approval
  @mutex.synchronize do
    @entries.values.select { |e| e.status == :needs_approval }
                   .sort_by { |e| [e.approval_seq.to_i, e.started_at] }
  end
end

#begin_approval(id, gate:, approval_id:, question:, command:, budget: false) ⇒ Object

Flips an entry into the :needs_approval state and stores the gate + question/command the card surfaces (Option 2). The child thread then parks on ‘gate.await(approval_id)`; the user resolves it via /agents <id>. Returns the previous status so the child can restore it.



327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/rubino/tools/background_tasks.rb', line 327

def begin_approval(id, gate:, approval_id:, question:, command:, budget: false)
  @mutex.synchronize do
    entry = @entries[id]
    return unless entry

    entry.approval_gate     = gate
    entry.approval_id       = approval_id
    entry.approval_question = question.to_s
    entry.approval_command  = command.to_s
    entry.budget_request    = budget ? true : false
    entry.approval_seq      = (@approval_seq += 1)
    entry.status            = :needs_approval
  end
end

#cancel_allObject

Structured-concurrency teardown seam: cancel EVERY live subagent so the process never leaves a child parked. The required fix for the parent-death deadlock (#XXX) — when the PARENT dies/interrupts (REPL break, HUP/TERM, clean quit, an aborted turn) a child parked on its approval gate otherwise stays parked for the full approval timeout because nothing cancels its gate; the per-id stop paths only fire on an explicit /agents –stop or task_stop. Calling this from each parent-death edge wakes every blocked child SYNCHRONOUSLY (cancel! pushes its sentinel; the gate’s await observes it within one WAKE_TICK) so each unwinds via the existing ‘rescue Rubino::Interrupted` with the clean “parent question was cancelled” message instead of hanging to the bound. No-op when there are no live children, and idempotent (#stop_entry is), so it is safe to invoke from a teardown `ensure` and from a signal trap. Snapshots #running first (outside the per-entry work) so we don’t hold the registry mutex across the gate/runner cancels.



479
480
481
482
483
484
485
486
487
488
489
490
# File 'lib/rubino/tools/background_tasks.rb', line 479

def cancel_all
  live = running
  live.each { |entry| stop_entry(entry) }
  # Logical cancel alone (above) only flips cancel tokens and trusts each
  # child THREAD to observe the token and reap its own shell within a wake
  # tick — but on parent-DEATH the process exits before the thread reaches
  # that checkpoint, so any shell a child spawned (its own pgid) reparents
  # to init as an orphan (MED-2). Reap the tracked shell process groups
  # SYNCHRONOUSLY here so the same parent-death edges that call cancel_all
  # (clean quit, HUP/TERM trap, REPL break) leave no surviving shell.
  ShellRegistry.instance.kill_all_groups
end

#complete(entry, status:, result: nil, error: nil) ⇒ Object

Records terminal state when the worker finishes (called from its ‘ensure`). Single writer per entry, but guarded so #find/#list readers see a consistent snapshot. A failure landing on a :stopping entry is a USER-REQUESTED stop unwinding (Interrupted at the next checkpoint), so it is recorded as :stopped — distinct from a genuine :failed (#108/#13).

H5 — closes the drain↔complete race. The final drain of the child’s steer_queue happens HERE, under the SAME registry mutex that flips the status to terminal, and #steer refuses to push onto a terminal entry under that SAME mutex. So a steer/answer arriving concurrently is serialised against this finalize: it is EITHER pushed before the status flips (and drained right here into the returned ‘undelivered` notes) OR rejected by #steer (which then honestly reports not-delivered). The earlier shape — drain (InputQueue lock) then complete (registry lock), two locks with a gap — let an answer land on a now-dead queue: dropped, omitted from `undelivered`, yet reported delivered. Returns the notes that were still queued at finalize time (never delivered to the child), so the caller can surface them as undelivered.



257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/rubino/tools/background_tasks.rb', line 257

def complete(entry, status:, result: nil, error: nil)
  @mutex.synchronize do
    status            = :stopped if entry.status == :stopping && status == :failed
    entry.status      = status
    entry.result      = result
    entry.error       = error
    entry.finished_at = Time.now
    # Drain UNDER the mutex: anything still here is undelivered (the child
    # has no further turn to fold it in), and once status is terminal no
    # new note can arrive — #steer rejects it.
    entry.steer_queue&.drain || []
  end
end

#end_approval(id) ⇒ Object

Clears the approval state and returns the entry to :running once a decision has been delivered (or the child unwinds).



344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/rubino/tools/background_tasks.rb', line 344

def end_approval(id)
  @mutex.synchronize do
    entry = @entries[id]
    return unless entry

    entry.approval_gate     = nil
    entry.approval_id       = nil
    entry.approval_question = nil
    entry.approval_command  = nil
    entry.budget_request    = false
    entry.approval_seq      = nil
    entry.status            = :running if entry.status == :needs_approval
  end
end

#find(id) ⇒ Object



426
427
428
# File 'lib/rubino/tools/background_tasks.rb', line 426

def find(id)
  @mutex.synchronize { @entries[id] }
end

#listObject

All entries, newest first — for a ‘task` listing (the /tasks analogue).



431
432
433
# File 'lib/rubino/tools/background_tasks.rb', line 431

def list
  @mutex.synchronize { @entries.values.sort_by(&:started_at).reverse }
end

#owned_by?(parent_id, child_id) ⇒ Boolean

True iff ‘child_id`’s direct owner is ‘parent_id` (the ownership predicate steer/probe AUTHORIZATION checks build on).

Returns:

  • (Boolean)


507
508
509
510
511
512
# File 'lib/rubino/tools/background_tasks.rb', line 507

def owned_by?(parent_id, child_id)
  @mutex.synchronize do
    child = @entries[child_id]
    !child.nil? && child.owner_subagent_id == parent_id
  end
end

#queued_approval_countObject

How many children are parked on an approval BEHIND the head — i.e. the backlog the active modal advertises as “(N more queued)”. Only ONE approval modal is presented at a time (awaiting_approval.first); this is everyone else still :needs_approval. Zero when at most one child is parked. The active modal reads this so the user knows more are waiting and that resolving the current one dequeues the next.



422
423
424
# File 'lib/rubino/tools/background_tasks.rb', line 422

def queued_approval_count
  [awaiting_approval.size - 1, 0].max
end

#record_live_probe(id) ⇒ Object

Records a BILLED live probe against a child (S3): bumps probe_count under the mutex (the owner runs this on its own thread while the parent renderer may read the entry). Returns the new count, or nil for an unknown id. Free snapshot probes (live:false) never call this — only ‘probe(live:true)` does, after the budget check passes.



392
393
394
395
396
397
398
399
400
# File 'lib/rubino/tools/background_tasks.rb', line 392

def record_live_probe(id)
  @mutex.synchronize do
    entry = @entries[id]
    return nil unless entry

    entry.probe_count = entry.probe_count.to_i + 1
    entry.probe_count
  end
end

#record_tool_finished(id, line) ⇒ Object

Records a child tool FINISHING: appends a terse line to the bounded activity ring the live drill-in (#71) tails. Keeps the last ACTIVITY_LOG_MAX entries so the ring never grows unbounded for a read-heavy child. Also wipes the live output tail — it belongs to the tool that just finished, so the drill-in’s output: block clears (#5).



292
293
294
295
296
297
298
299
300
301
302
# File 'lib/rubino/tools/background_tasks.rb', line 292

def record_tool_finished(id, line)
  @mutex.synchronize do
    entry = @entries[id]
    return unless entry

    log = (entry.activity_log ||= [])
    log << line.to_s
    log.shift while log.size > ACTIVITY_LOG_MAX
    entry.output_tail = nil
  end
end

#record_tool_output(id, chunk) ⇒ Object

Records a streamed chunk of the CURRENTLY RUNNING tool’s output (#5): splits on newlines into a bounded line buffer whose LAST slot carries the in-flight partial line, so the /agents drill-in can tail it live. Called from a subagent’s UI::CLI#tool_chunk on the CHILD thread, so it MUST take the mutex like the other record_* writers. No-op for an unknown id.



309
310
311
312
313
314
315
316
317
318
319
320
321
# File 'lib/rubino/tools/background_tasks.rb', line 309

def record_tool_output(id, chunk)
  @mutex.synchronize do
    entry = @entries[id]
    return unless entry

    tail = (entry.output_tail ||= [""])
    chunk.to_s.each_line do |line|
      tail[-1] = "#{tail[-1]}#{line.chomp}"[0, OUTPUT_TAIL_LINE_MAX]
      tail << "" if line.end_with?("\n")
    end
    tail.shift while tail.size > OUTPUT_TAIL_MAX + 1
  end
end

#record_tool_started(id, activity) ⇒ Object

Records a child tool STARTING: bumps the tool counter and sets the last-activity string the card/list show so concurrent tasks stay distinguishable (#124/#127). Called from a subagent’s UI::CLI#tool_started, which runs on the CHILD thread, so it MUST take the mutex (the parent renderer reads these fields concurrently). No-op for an unknown id (a late event after #remove).



277
278
279
280
281
282
283
284
285
# File 'lib/rubino/tools/background_tasks.rb', line 277

def record_tool_started(id, activity)
  @mutex.synchronize do
    entry = @entries[id]
    return unless entry

    entry.tool_count = entry.tool_count.to_i + 1
    entry.last_activity = activity.to_s
  end
end

#remove(id) ⇒ Object



443
444
445
# File 'lib/rubino/tools/background_tasks.rb', line 443

def remove(id)
  @mutex.synchronize { @entries.delete(id) }
end

#request_stop(id) ⇒ Object

Marks a stop REQUEST (the /agents <id> –stop / task_stop path) on a live entry so the list/cards immediately show ◌ stopping instead of a stale ● running while the child unwinds at its next checkpoint (#108). Returns true when the entry flipped. #complete then maps a failure on a :stopping entry to the terminal :stopped, so a deliberate stop never reads as ✗ failed (#13).



229
230
231
232
233
234
235
236
237
# File 'lib/rubino/tools/background_tasks.rb', line 229

def request_stop(id)
  @mutex.synchronize do
    entry = @entries[id]
    return false unless entry && live_status?(entry.status)

    entry.status = :stopping
    true
  end
end

#reserve(subagent:, prompt:, owner_subagent_id: nil, depth: 0) ⇒ Object

Reserves a slot and registers a ‘running` entry, returning it. The caller then attaches the worker thread + runner via #attach.

owner_subagent_id is the ‘sa_*` id of the SPAWNING subagent (nil ⇒ the human / top-level agent spawned this child). depth is the caller’s hint for a human-spawned child (0); for an owner-spawned child the depth is recomputed here from the owner entry (owner.depth + 1) so a stale hint can’t smuggle a child past the depth cap.

Returns nil — so TaskTool can surface a clear message instead of spawning unbounded work — when ANY of the three nesting caps is hit. The reason is available via #last_refusal_reason for the caller to phrase the message:

:depth          — depth >= max_depth (no deeper nesting allowed)
:per_owner      — this owner already has max_children_per_node live kids
:global         — total live subagents across the tree >= max total

This is the SINGLE enforcement point for every nesting limit.



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/rubino/tools/background_tasks.rb', line 180

def reserve(subagent:, prompt:, owner_subagent_id: nil, depth: 0)
  @mutex.synchronize do
    owner = owner_subagent_id ? @entries[owner_subagent_id] : nil
    effective_depth = owner ? owner.depth.to_i + 1 : depth.to_i

    @last_refusal_reason = refusal_reason(owner_subagent_id, effective_depth)
    return nil if @last_refusal_reason

    entry = Entry.new(
      id: new_id,
      subagent: subagent.to_s,
      prompt: prompt.to_s,
      status: :running,
      started_at: Time.now,
      tool_count: 0,
      activity_log: [],
      # Every background child gets its OWN steering queue at reserve time
      # so the parent can `/agents <id> steer "..."` it the instant it is
      # listed — no separate wiring step, no nil window.
      steer_queue: Interaction::InputQueue.new,
      owner_subagent_id: owner_subagent_id,
      depth: effective_depth
    )
    @entries[entry.id] = entry
    entry
  end
end

#runningObject

Live (still-running) children — used by the parent stop path to cancel orphans, and to enforce the concurrency cap. A child parked on a human approval (:needs_approval) is STILL live (its thread is alive, holding a slot), so it counts as running here.



439
440
441
# File 'lib/rubino/tools/background_tasks.rb', line 439

def running
  @mutex.synchronize { @entries.values.select { |e| live_status?(e.status) } }
end

#shutdown!(grace: 1.0) ⇒ Object

Process-exit teardown: first do the cooperative cancel above, then give child threads a short chance to finish and finally kill non-cooperative survivors. Background subagents are Ruby threads, not OS child processes; if a child is stuck in a provider read that never observes its cancel token, a plain #cancel_all leaves the process alive waiting on that non-daemon thread. This method is for chat shutdown only, not normal per-task stops.



499
500
501
502
503
# File 'lib/rubino/tools/background_tasks.rb', line 499

def shutdown!(grace: 1.0)
  live = running
  cancel_all
  join_or_kill_threads(live, grace: grace)
end

#steer(id, text) ⇒ Object

Records a parent->child steer note (the ‘/agents <id> steer "…"` affordance). Pushes the text onto the child’s steering queue, which the child Loop drains at its next iteration boundary (Loop#inject_steered_input) — between turns, never between a tool_use and its results. Best-effort: returns false (and pushes nothing) when the entry is gone, has no queue, or has ALREADY reached a terminal state (the child finished — there is no more turn to fold the note into); true when the note was queued.

H5 — the push happens UNDER the registry mutex, gated on a non-terminal status, so it is serialised against #complete (which flips the status to terminal AND drains the queue under that SAME mutex). Either this push wins the lock first (the note is queued and will be drained — by the child at its next turn, or by #complete into the undelivered report) or #complete wins first (status is terminal and this returns false). There is no window in which a note is pushed onto a queue nobody will drain yet reported delivered. Pushing inside the mutex is safe: InputQueue#push has its own lock and never calls back into the registry, so no lock cycle.



376
377
378
379
380
381
382
383
384
385
# File 'lib/rubino/tools/background_tasks.rb', line 376

def steer(id, text)
  @mutex.synchronize do
    entry = @entries[id]
    return false unless entry&.steer_queue
    return false if terminal_status?(entry.status)

    entry.steer_queue.push(text)
    true
  end
end

#stop_entry(entry) ⇒ Object

The ONE per-entry stop body, shared by every stop path (the human /agents <id> –stop, the model-callable task_stop, and the parent-teardown #cancel_all below). Marks the stop so the unwind records as :stopped (not ✗ failed) and the list shows ◌ stopping, then wakes the entry no matter HOW it is blocked: a child parked on its approval gate (cancel it → Interrupted → clean unwind) and the runner’s CancelToken for a child between checkpoints. Idempotent and safe on an already-stopped or never-blocked entry (each cancel! is one-shot; request_stop no-ops on a non-live status), so #cancel_all can call it across the whole registry.



456
457
458
459
460
461
462
# File 'lib/rubino/tools/background_tasks.rb', line 456

def stop_entry(entry)
  return unless entry

  request_stop(entry.id)
  entry.approval_gate&.cancel!
  entry.runner&.cancel!
end