Class: Rubino::Agent::Loop
- Inherits:
-
Object
- Object
- Rubino::Agent::Loop
- Defined in:
- lib/rubino/agent/loop.rb
Overview
The core agent loop that handles LLM calls and tool execution cycles. Runs until the LLM produces a final text response or budget is exhausted.
Constant Summary collapse
- HARNESS_CONTROL_MARKER =
Trusted-harness control marker (#75). Runtime control messages the harness injects mid-turn (continuation prompt, budget-exhaustion summary nudge) are appended as role:“user” content for provider compatibility, but they are NOT user input — they are the harness speaking. Without a marker, an injection-aware model (MiniMax-M3) reads a count-/instruction-bearing “user” message (“you ran N tool calls … do not claim nothing was done”) as a prompt-injection attempt, announces it’s ignoring it, and derails. The system prompt (build.txt [Runtime control]) declares this prefix TRUSTED so the model obeys it instead of defending against it. Mirrors the existing
- harness note
-
/ [background notices] convention.
"[harness control]"- MAX_ITERATIONS_SUMMARY_NUDGE =
Nudge issued on the final, toolless model call when the iteration/budget ceiling is hit. Mirrors the reference handle_max_iterations summary request — ask the model to wrap up in prose instead of ending the turn with nothing. Carries the trusted-harness marker (#75) so it reads as runtime control, not as suspect user input.
"#{HARNESS_CONTROL_MARKER} You've reached the maximum number of " \ "tool-calling iterations allowed. " \ "Please provide a final response summarizing what you've found and " \ "accomplished so far, without calling any more tools.".freeze
- NOTICES_PREAMBLE =
Framing for turn-start background notices (#148): tells the model the notices are secondary to the user message that follows them.
"[background notices — acknowledge briefly; the user's message AFTER " \ "these notices is the instruction to act on]"
- STREAM_CONTINUATION_MAX =
Stream-recovery (Hermes parity): a stream that ends with no finish signal is RECOVERED, not failed. If text was already shown, persist the partial and ask the model to CONTINUE from exactly where it stopped (no restart, no repeat) — up to STREAM_CONTINUATION_MAX rounds. If nothing was shown yet, the call is simply retried (discard-and-restart), bounded by agent.empty_response_max_retries.
3- STREAM_CONTINUE_PROMPT =
"#{HARNESS_CONTROL_MARKER} Continue exactly where you left off. " \ "Do not restart or repeat any prior text.".freeze
- BLOCKED_TOOL_REMINDER =
Anti-confabulation reinforcement (#583), injected on the model input ONCE per turn in which a tool was actually blocked/denied (gated on over-firing harness-note regression). Carries the trusted-harness marker so an injection-aware model reads it as runtime control, not user input, and wraps the field-standard ‘<system-reminder>` phrasing that the is_error tool_result (Lever 1) reinforces.
"#{HARNESS_CONTROL_MARKER} <system-reminder>One or more tool calls were " \ "blocked and produced NO output. Treat any blocked tool as having " \ "returned nothing — never state or imply a blocked tool's result. If you " \ "needed it, report that the task is blocked pending approval." \ "</system-reminder>".freeze
Instance Method Summary collapse
-
#initialize(session:, llm_adapter:, tool_executor:, message_store:, budget:, ui:, event_bus:, config:, cancel_token: nil, initial_image_paths: [], input_queue: nil) ⇒ Loop
constructor
A new instance of Loop.
-
#run(messages:, tools:) ⇒ Object
Runs the agent loop, returning the final assistant response content.
Constructor Details
#initialize(session:, llm_adapter:, tool_executor:, message_store:, budget:, ui:, event_bus:, config:, cancel_token: nil, initial_image_paths: [], input_queue: nil) ⇒ Loop
Returns a new instance of Loop.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/rubino/agent/loop.rb', line 62 def initialize(session:, llm_adapter:, tool_executor:, message_store:, budget:, ui:, event_bus:, config:, cancel_token: nil, initial_image_paths: [], input_queue: nil) @session = session @llm = llm_adapter @tool_executor = tool_executor @message_store = @budget = budget @ui = ui @event_bus = event_bus @config = config @cancel_token = cancel_token # Optional steering hand-off (Interaction::InputQueue). When present, # text the user typed mid-turn is drained at the top of each loop # iteration and injected as a user message. Nil for the API/server path # and nested subagent runs — they get no injection and behave exactly # as before. @input_queue = input_queue # Consumed once on the first iteration. After the first model call # subsequent iterations are tool-result follow-ups — no user input, # nothing to re-attach. @pending_image_paths = Array(initial_image_paths) # Provider/model fallback chain (Slice 7). Primary at index 0; rotates to # the next configured backend when the primary keeps failing, and is # restored at the top of each turn (#run). With no agent.fallback_models # configured the chain holds only the primary and is an inert pass-through, # so single-provider setups behave exactly as before. @fallback_chain = FallbackChain.new( primary_adapter: llm_adapter, config: config, ui: ui, event_bus: event_bus, tool_executor: tool_executor, cancel_token: cancel_token ) # Owns the inner retry loop (call → validate → classify → backoff → # return/raise). The Loop builds each LLM::Request and hands it to the # runner, which returns a validated response or raises (empty-exhausted → # EmptyModelResponseError; transient-exhausted/permanent → the classified # error). The error-classification + backoff retries that used to live in # the adapter's with_retries now live here — single owner, no double-retry. # The runner issues calls against the chain's CURRENT adapter and can # rotate it via the chain on a fallback-worthy failure. @model_call_runner = ModelCallRunner.new( llm: llm_adapter, fallback_chain: @fallback_chain, config: config, ui: ui, event_bus: event_bus, cancel_token: cancel_token ) # Single count + persist sink for tool results. The executor invokes it # for every tool on BOTH paths: the streaming path (ruby_llm runs the # tool mid-stream via ToolBridge → ToolExecutor#execute, never returning # through #execute_tool_calls) and the non-streaming path. Registered # here rather than passed at construction because the executor is built # before the Loop (the adapter/ToolBridge share the same executor). @tool_executor.on_result = method(:handle_tool_result) if @tool_executor.respond_to?(:on_result=) end |
Instance Method Details
#run(messages:, tools:) ⇒ Object
Runs the agent loop, returning the final assistant response content.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 |
# File 'lib/rubino/agent/loop.rb', line 123 def run(messages:, tools:) # rubocop:disable Metrics/PerceivedComplexity,Metrics/CyclomaticComplexity # Stash the resolved toolset so #streaming? can decide, per run, whether # this turn might block on a human (clarify/approval). When it might, we # run NON-STREAMING so the LLM HTTP request completes and CLOSES before # any tool fires — leaving no upstream socket held open during the gate # wait (the wait can now be effectively unbounded; see ApprovalGate). @turn_tools = Array(tools) iteration = 0 turn_started_at = monotonic_now # Reflect-guard against fabricated "done" (the #1 trust-killer): a # toolless turn whose prose claims an action it never carried out. Built # once per turn from the toolset actually on offer; counts its own # corrective re-prompts so it can stop honestly at the cap. @action_guard = ActionClaimGuard.new(exposed_tool_names: @turn_tools.map { |t| tool_name_of(t) }) @reflection_count = 0 # The user request driving this turn, captured from the OPENING transcript # (before any guard reflection note is appended) — the guard consults it # to skip challenging a NO-ACTION (plan/explain/"don't run tools") turn the # user explicitly asked for (#353a). @turn_user_request = originating_user_request() # If a previous turn rotated to a fallback, restore the primary backend # so this turn gets a fresh attempt with the preferred model # (conversation_loop.py:427). No-op when we never left the primary. @fallback_chain.restore_primary! # Mutated by the ToolExecutor's on_result sink (see #handle_tool_result), # which fires for EVERY tool regardless of streaming mode — including the # streaming path where ruby_llm runs the tool mid-stream via ToolBridge # and never returns through #execute_tool_calls below. Instance vars (not # locals) so the sink closure can update them. @tool_count = 0 @denied_count = 0 # Tools that ERRORED / were blocked (e.g. a write refused by the # workspace jail). They neither "ran" nor mutated, so they stay out of # @tool_count/@edit_count and never trip the #381 "review uncommitted # changes" note (S7 F1) — tracked separately for the footer/diagnostics. @errored_count = 0 # Of the tools that RAN, how many were MUTATING (edit/write/patch). Lets # the pessimistic-summary reconciliation (#381) say "N tool calls (M edits # — review uncommitted changes)" so a developer is pointed at real, # possibly-uncommitted disk changes when the model claims it did nothing. @edit_count = 0 # Round-trips ruby_llm ran INSIDE a single streaming ask() this turn # (#355a). ruby_llm drives the whole model↔tool loop within one # chat.ask, so the outer `iteration` counter above stays at 1 for the # entire streaming turn and never re-consults the budget between the # intermediate round-trips. The adapter calls #note_stream_round_trip # once per round-trip (via on_round_trip), and #stream_budget_exhausted? # reads this count so ToolBridge can Halt the in-ask loop once the # iteration/time budget is spent. Reset per turn. @stream_round_trips = 0 # Accumulates the content streamed to the screen this turn so that an # interrupt mid-stream can persist EXACTLY what the user saw, marked # interrupted (#338b). Reset per turn — a one-shot CancelToken plus a # fresh buffer means a stale partial can never attach to a later turn. @interrupt_partial = +"" # Stream-recovery budgets (Hermes parity) — reset per turn. A no-finish # stream end is retried (empty partial → discard-and-restart) or continued # (partial shown → keep-and-continue) instead of failing the turn. @stream_retry_count = 0 @continuation_count = 0 # True once any denial this turn was a headless fail-closed block ("needs # approval but no interactive session", #260) — lets the binding guard # point at `--yolo` (F2) instead of "approve it" in the honest message. @noninteractive_block = false # One-shot latch (#583): the blocked-tool <system-reminder> is injected at # most once per turn, only after a real block, and reset here so a fresh # turn never inherits a prior turn's reminder. @blocked_reminder_emitted = false token_total = 0 loop do iteration += 1 @cancel_token&.check! # Mid-turn steering boundary. SAFE point: the cancel check has passed # and any prior assistant(tool_use) + tool(result) messages from the # previous iteration are already appended, so adding a USER message # here can never split a tool_use from its results (no orphan pair on # strict providers). On iteration 1 the initial user input is already # the user turn, so only parked background NOTICES fold in (#13); # typed lines stay queued for their own turns. inject_steered_input(, iteration) inject_blocked_tool_reminder() unless @budget.can_continue?(iteration) @ui.warning("Iteration budget exhausted (#{iteration} turns)") outcome = handle_budget_exhausted(, iteration, turn_started_at, token_total) # :continue → the user (interactively) granted more budget; the # iteration cap was raised and we re-enter the SAME turn with full # context (no re-summary, no truncation). Anything else is the final # assistant text (force-summary / abort). next if outcome == :continue return outcome end @event_bus.emit(Interaction::Events::MODEL_CALL_STARTED, iteration: iteration) # Show a transient "thinking…" indicator during TTFB. The UI erases # it the moment the first chunk lands (any type). Skipped in # non-streaming mode — the response arrives in one shot, indicator # would flash uselessly. @ui.thinking_started if streaming? begin response = call_model(, tools, iteration) rescue Rubino::Interrupted # The streaming callback (or the per-iteration check above) # observed cancellation. Persist EXACTLY the partial that was shown # on screen — flagged interrupted in metadata — so storage matches # the screen and the transcript stays truthful & resumable (#338b). # Without this, the on-screen `⎿ interrupted` partial was absent from # the messages table and resume/compaction/memory diverged from what # the user saw. Then close any open stream box (commits the partial # answer streamed so far) and bail out — the standardized # `⎿ interrupted` marker is appended once by the Runner's rescue, # right after this kept partial. The upstream stream is already # cancelled: raising out of the per-chunk callback unwinds Faraday's # net-http read loop, which closes the socket (no drain) — verified # against ruby_llm 1.x's Streaming#stream_response, where the block # we raise from runs inside the on_data handler. persist_interrupted_partial @ui.stream_end if streaming? raise end @event_bus.emit(Interaction::Events::MODEL_CALL_FINISHED, tokens: response.total_tokens, input_tokens: response.input_tokens, output_tokens: response.output_tokens, stop_reason: response.stop_reason, model_id: response.model_id, has_tool_calls: response.has_tool_calls?) token_total += response.total_tokens.to_i # #355a: the streaming round-trip loop was cut short mid-flight because # this turn's iteration/time budget was spent (ToolBridge returned # Tool::Halt). ruby_llm already added a valid trailing tool message, so # the history is well-formed — hand off to the same budget-exhausted # summary the outer-loop cap uses. `iteration` is still 1 for a # streaming turn, so pass the round-trip count as the iteration reached. if response.halted? outcome = handle_budget_exhausted(, @stream_round_trips, turn_started_at, token_total) # :continue → budget extended; the next ask() picks up the # well-formed post-Halt history (ruby_llm already appended the # trailing tool message) and resumes the in-ask round-trip loop # against the now-larger budget. No tool_bridge change needed. next if outcome == :continue return outcome end if response.interrupted? # The upstream stream was cut before a clean completion (no # finish_reason / [DONE]). Rather than failing the turn, RECOVER it the # way Hermes does (chat_completion_helpers.py:2394-2452 + the # conversation-loop continuation) — split on whether any text was shown: finalize_stream(response) # close the partial stream box (shown live) if response.content.to_s.empty? # (B) Nothing streamed yet — DISCARD and re-call the model. A slow or # flaky provider (large-context TTFT past its stream idle timeout) # usually succeeds on a fresh attempt, and since nothing was shown a # retry can't duplicate output. Only fail once the budget is spent. if @stream_retry_count < stream_recovery_retries @stream_retry_count += 1 @ui.warning("the model stream ended before any output — " \ "retrying (#{@stream_retry_count}/#{stream_recovery_retries})") next end emit_turn_summary(turn_started_at, token_total) raise Rubino::StreamInterruptedError, "stream ended before completion with no output after " \ "#{@stream_retry_count} retr#{@stream_retry_count == 1 ? "y" : "ies"} — " \ "the provider kept closing the stream before the first token." end # (A) Text was already streamed/shown — KEEP it and ask the model to # CONTINUE exactly where it left off (no restart, no duplication). The # partial is persisted as an interim assistant turn so the next call # sees what it already said; capped at STREAM_CONTINUATION_MAX rounds. (response) if @continuation_count < STREAM_CONTINUATION_MAX @continuation_count += 1 << { role: "assistant", content: response.content.to_s } << { role: "user", content: STREAM_CONTINUE_PROMPT } next end # Continuations exhausted — hand back the recovered partial as the # (truncated) final answer: truthful and resumable, not a hard failure. emit_turn_summary(turn_started_at, token_total) return response.content end if response.text_only? # Fabricated-"done" gate: the structured tool-call channel is the # ONLY thing that advances state. If this toolless turn's prose # asserts an action against a tool we expose (or claims a `cd` we # cannot do), DON'T let that reach the user as a completed answer. guard = guard_text_only_turn(response, ) # A corrective user message was appended; loop again so the model # either calls the tool or owns up. iteration/token_total carry on. next if guard == :reflected # cd: the claim can never be true, so we replaced the fabricated # final answer with an honest message (how to actually change the # workspace). Surface that, not the model's no-op claim. final = guard.is_a?(String) ? guard : response.content # PESSIMISTIC reconciliation (#381/#84) on the NORMAL closing summary. # #evaluate above returns nil the moment tools ran this turn, so a # CONTINUE-path closing answer (the user accepted "Continue (+N)", the # turn ran more tools/edits, then ended with an ordinary text answer — # NOT the force-summary call) never reached the ledger guard. If that # closing summary pessimistically calls real, on-disk work "not done / # not started / queued but unstarted" while @tool_count shows tools ran # (and @edit_count shows the files were edited), reconcile it with the # same harness ledger note the force-summary path uses. Routed to # stderr/event, never spliced into the answer (#418). nil when the guard # already replaced the answer (no model summary to reconcile) or no # tools ran. if guard.nil? note = @action_guard.pessimistic_summary_note( content: final, tool_count: @tool_count, edit_count: @edit_count ) emit_harness_note(note) if note end persist_final_text(response, final) finalize_stream_text(response, final) emit_turn_summary(turn_started_at, token_total) # The ANSWER returned to the caller is the LAST text block only # (#core-F1): on a streaming turn whose final round-trip used a tool, # `response.content` is every text block of the turn concatenated # (pre-tool narration + post-tool answer, no delimiter), which a # headless `OUT=$(rubino prompt …)` would capture as one run-on string. # The full text was already streamed live and persisted via #final # above (transcript/render keep the narration, #261); the value we # HAND BACK is the post-final-tool answer in isolation. A guard # replacement is a synthesized string with no narration to strip, so it # passes through unchanged. return guard.is_a?(String) ? guard : response.final_text_block end if response.has_tool_calls? (response) close_intermediate_stream(response) # Bedrock (and other providers) require the assistant turn with the # toolUse block to appear in the conversation history before the # toolResult turn. Append it now so the next LLM call sees the # correct sequence: user → assistant(toolUse) → user(toolResult). << (response) # NOTE: counting and `tool` message persistence happen in the # ToolExecutor's on_result sink (#handle_tool_result), which fires # for BOTH this non-streaming path and the streaming path (where # ruby_llm runs tools mid-stream and never returns here). We only # build the conversation-history messages for the next iteration. execute_tool_calls(response.tool_calls).each { |result| << result } else # Unreachable in practice: the ModelCallRunner either returns a # response with text or tool calls, or raises EmptyModelResponseError. # Kept as a defensive backstop so a future response shape can never # silently complete an empty turn. emit_turn_summary(turn_started_at, token_total) raise Rubino::EmptyModelResponseError end end end |