Class: Rubino::Agent::Loop

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/agent/loop.rb

Overview

The core agent loop that handles LLM calls and tool execution cycles. Runs until the LLM produces a final text response or budget is exhausted.

Constant Summary collapse

MAX_ITERATIONS_SUMMARY_NUDGE =

Nudge issued on the final, toolless model call when the iteration/budget ceiling is hit. Mirrors the reference handle_max_iterations summary request — ask the model to wrap up in prose instead of ending the turn with nothing.

"You've reached the maximum number of tool-calling iterations allowed. " \
"Please provide a final response summarizing what you've found and " \
"accomplished so far, without calling any more tools."
NOTICES_PREAMBLE =

Framing for turn-start background notices (#148): tells the model the notices are secondary to the user message that follows them.

"[background notices — acknowledge briefly; the user's message AFTER " \
"these notices is the instruction to act on]"

Instance Method Summary collapse

Constructor Details

#initialize(session:, llm_adapter:, tool_executor:, message_store:, budget:, ui:, event_bus:, config:, cancel_token: nil, initial_image_paths: [], input_queue: nil) ⇒ Loop

Returns a new instance of Loop.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/rubino/agent/loop.rb', line 23

def initialize(session:, llm_adapter:, tool_executor:, message_store:,
               budget:, ui:, event_bus:, config:, cancel_token: nil,
               initial_image_paths: [], input_queue: nil)
  @session             = session
  @llm                 = llm_adapter
  @tool_executor       = tool_executor
  @message_store       = message_store
  @budget              = budget
  @ui                  = ui
  @event_bus           = event_bus
  @config              = config
  @cancel_token        = cancel_token
  # Optional steering hand-off (Interaction::InputQueue). When present,
  # text the user typed mid-turn is drained at the top of each loop
  # iteration and injected as a user message. Nil for the API/server path
  # and nested subagent runs — they get no injection and behave exactly
  # as before.
  @input_queue         = input_queue
  # Consumed once on the first iteration. After the first model call
  # subsequent iterations are tool-result follow-ups — no user input,
  # nothing to re-attach.
  @pending_image_paths = Array(initial_image_paths)
  # Provider/model fallback chain (Slice 7). Primary at index 0; rotates to
  # the next configured backend when the primary keeps failing, and is
  # restored at the top of each turn (#run). With no agent.fallback_models
  # configured the chain holds only the primary and is an inert pass-through,
  # so single-provider setups behave exactly as before.
  @fallback_chain      = FallbackChain.new(
    primary_adapter: llm_adapter,
    config: config,
    ui: ui,
    event_bus: event_bus,
    tool_executor: tool_executor,
    cancel_token: cancel_token
  )
  # Owns the inner retry loop (call → validate → classify → backoff →
  # return/raise). The Loop builds each LLM::Request and hands it to the
  # runner, which returns a validated response or raises (empty-exhausted →
  # EmptyModelResponseError; transient-exhausted/permanent → the classified
  # error). The error-classification + backoff retries that used to live in
  # the adapter's with_retries now live here — single owner, no double-retry.
  # The runner issues calls against the chain's CURRENT adapter and can
  # rotate it via the chain on a fallback-worthy failure.
  @model_call_runner = ModelCallRunner.new(
    llm: llm_adapter,
    fallback_chain: @fallback_chain,
    config: config,
    ui: ui,
    event_bus: event_bus,
    cancel_token: cancel_token
  )
  # Single count + persist sink for tool results. The executor invokes it
  # for every tool on BOTH paths: the streaming path (ruby_llm runs the
  # tool mid-stream via ToolBridge → ToolExecutor#execute, never returning
  # through #execute_tool_calls) and the non-streaming path. Registered
  # here rather than passed at construction because the executor is built
  # before the Loop (the adapter/ToolBridge share the same executor).
  @tool_executor.on_result = method(:handle_tool_result) if @tool_executor.respond_to?(:on_result=)
end

Instance Method Details

#run(messages:, tools:) ⇒ Object

Runs the agent loop, returning the final assistant response content.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/rubino/agent/loop.rb', line 84

def run(messages:, tools:)
  # Stash the resolved toolset so #streaming? can decide, per run, whether
  # this turn might block on a human (clarify/approval). When it might, we
  # run NON-STREAMING so the LLM HTTP request completes and CLOSES before
  # any tool fires — leaving no upstream socket held open during the gate
  # wait (the wait can now be effectively unbounded; see ApprovalGate).
  @turn_tools     = Array(tools)
  iteration       = 0
  turn_started_at = monotonic_now

  # If a previous turn rotated to a fallback, restore the primary backend
  # so this turn gets a fresh attempt with the preferred model
  # (conversation_loop.py:427). No-op when we never left the primary.
  @fallback_chain.restore_primary!

  # Mutated by the ToolExecutor's on_result sink (see #handle_tool_result),
  # which fires for EVERY tool regardless of streaming mode — including the
  # streaming path where ruby_llm runs the tool mid-stream via ToolBridge
  # and never returns through #execute_tool_calls below. Instance vars (not
  # locals) so the sink closure can update them.
  @tool_count     = 0
  @denied_count   = 0
  token_total     = 0

  loop do
    iteration += 1
    @cancel_token&.check!

    # Mid-turn steering boundary. SAFE point: the cancel check has passed
    # and any prior assistant(tool_use) + tool(result) messages from the
    # previous iteration are already appended, so adding a USER message
    # here can never split a tool_use from its results (no orphan pair on
    # strict providers). On iteration 1 the initial user input is already
    # the user turn, so only parked background NOTICES fold in (#13);
    # typed lines stay queued for their own turns.
    inject_steered_input(messages, iteration)

    unless @budget.can_continue?(iteration)
      @ui.warning("Iteration budget exhausted (#{iteration} turns)")
      return summarize_on_budget_exhausted(messages, iteration,
                                           turn_started_at, token_total)
    end

    @event_bus.emit(Interaction::Events::MODEL_CALL_STARTED, iteration: iteration)
    # Show a transient "thinking…" indicator during TTFB. The UI erases
    # it the moment the first chunk lands (any type). Skipped in
    # non-streaming mode — the response arrives in one shot, indicator
    # would flash uselessly.
    @ui.thinking_started if streaming?
    begin
      response = call_model(messages, tools, iteration)
    rescue Rubino::Interrupted
      # The streaming callback (or the per-iteration check above)
      # observed cancellation. Close any open stream box on the UI
      # (commits the partial answer streamed so far) and bail out — the
      # standardized `⎿ interrupted` marker is appended once by the Runner's
      # rescue, right after this kept partial. Lifecycle will not persist a
      # turn that never completed, but the user already saw the partial.
      @ui.stream_end if streaming?
      raise
    end
    @event_bus.emit(Interaction::Events::MODEL_CALL_FINISHED,
                    tokens: response.total_tokens,
                    has_tool_calls: response.has_tool_calls?)

    token_total += response.total_tokens.to_i

    if response.interrupted?
      # The upstream stream was cut before a clean completion (no
      # finish_reason / [DONE]); `response` carries only a buffered partial
      # with no tool call. Returning it would end the run as "completed"
      # with truncated/empty output — the silent-completion bug. Persist
      # whatever streamed so the transcript keeps it, close the stream box,
      # then raise: Lifecycle maps this to INTERACTION_FAILED → run.failed,
      # the same path every other turn error already takes.
      persist_assistant_message(response) unless response.content.to_s.empty?
      finalize_stream(response)
      emit_turn_summary(turn_started_at, token_total)
      raise Rubino::StreamInterruptedError,
            "stream ended before completion after " \
            "#{response.content.to_s.bytesize} buffered byte(s) with no finish signal — " \
            "the model did not finish (run marked failed, not completed). " \
            "Often caused by a very large context pushing time-to-first-token past the " \
            "provider's stream idle timeout."
    end

    if response.text_only?
      persist_assistant_message(response)
      finalize_stream(response)
      emit_turn_summary(turn_started_at, token_total)
      return response.content
    end

    if response.has_tool_calls?
      persist_assistant_message(response)
      close_intermediate_stream(response)

      # Bedrock (and other providers) require the assistant turn with the
      # toolUse block to appear in the conversation history before the
      # toolResult turn. Append it now so the next LLM call sees the
      # correct sequence: user → assistant(toolUse) → user(toolResult).
      messages << build_assistant_tool_use_message(response)

      # NOTE: counting and `tool` message persistence happen in the
      # ToolExecutor's on_result sink (#handle_tool_result), which fires
      # for BOTH this non-streaming path and the streaming path (where
      # ruby_llm runs tools mid-stream and never returns here). We only
      # build the conversation-history messages for the next iteration.
      execute_tool_calls(response.tool_calls).each { |result| messages << result }
    else
      # Unreachable in practice: the ModelCallRunner either returns a
      # response with text or tool calls, or raises EmptyModelResponseError.
      # Kept as a defensive backstop so a future response shape can never
      # silently complete an empty turn.
      emit_turn_summary(turn_started_at, token_total)
      raise Rubino::EmptyModelResponseError
    end
  end
end