Class: Rubino::Agent::ModelCallRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/agent/model_call_runner.rb

Overview

The INNER retry loop of the conversation loop — a faithful port of the reference ‘while retry_count < max_retries` block (the invalid-response path and the error path).

ONE responsibility: issue a single model call against the LLM boundary and, when it comes back unusable or raises a transient error, retry it with backoff until it succeeds or the retry budget is exhausted. It OWNS the ‘retry_count`. The outer Loop hands it a built LLM::Request and gets back a validated AdapterResponse (or an exception).

Control flow per attempt:

call boundary
  → raises?  → ErrorClassifier.classify → retryable & budget left?
                 yes: backoff (error-path preset, honour Retry-After), retry
                 no : re-raise (permanent / budget exhausted)
  → returns? → ResponseValidator#valid?
                 valid          : return it
                 :empty_response: backoff (invalid-response preset), retry
                                  up to empty_response_max_retries, then
                                  raise EmptyModelResponseError
                 other invalid  : return as-is (nil / interrupted — the
                                  caller maps these to StreamInterruptedError;
                                  not the runner's job to retry)

TWO backoff sites, two budgets, exactly as the reference:

* invalid/empty response  → BackoffPolicy::INVALID_RESPONSE (5s/120s),
                            empty_response_max_retries (small, default 2)
* transient API error     → BackoffPolicy::ERROR_PATH (2s/60s),
                            agent.api_max_retries

The degenerate/empty-response path delegates to DegenerateResponseRecovery (Slice 5) — the seven-rung ladder (partial-stream → prior-turn → post-tool nudge → thinking-only prefill ×2 → empty retry ×3 → fallback seam →terminal raise) ported from the reference conversation loop. See #apply_recovery!.

NOT in scope here (left as clear seams):

* eager fallback on an invalid response and fallback-on-max-retries
  (the reference _try_activate_fallback, which RESETS
  retry_count to 0) is Slice 7 — see the `# SLICE-7` seam below. The
  counter is structured so a future fallback can reset it.

Instance Method Summary collapse

Constructor Details

#initialize(llm:, config:, ui:, event_bus:, cancel_token: nil, fallback_chain: nil, validator: ResponseValidator.new) ⇒ ModelCallRunner

Returns a new instance of ModelCallRunner.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/rubino/agent/model_call_runner.rb', line 47

def initialize(llm:, config:, ui:, event_bus:, cancel_token: nil,
               fallback_chain: nil, validator: ResponseValidator.new)
  @llm = llm
  # SLICE-7: the provider/model fallback chain. When present, the live
  # adapter for each attempt is the chain's CURRENT adapter (so a rotation
  # takes effect on the very next call), and a fallback-worthy failure
  # rotates it. Nil in tests/one-shot callers → behave as a fixed @llm.
  @fallback_chain = fallback_chain
  @config       = config
  @ui           = ui
  @event_bus    = event_bus
  @cancel_token = cancel_token
  @validator    = validator
end

Instance Method Details

#call!(request, iteration: nil) ⇒ Object

Run the inner retry loop for one model call. ‘request` is a built LLM::Request; an optional block forwards stream chunks straight through to the boundary (matching `@llm.call(request) { |chunk| }`). Returns a validated AdapterResponse, or raises EmptyModelResponseError / the classified API error.

‘iteration` is purely for the warning/telemetry text (which loop turn this call belongs to); it has no control-flow role.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/rubino/agent/model_call_runner.rb', line 70

def call!(request, iteration: nil, &)
  # Error-path budget — distinct from the empty/degenerate budgets, which
  # the recovery ladder owns (see #recovery). Kept here so a transient API
  # error can't bleed into the empty-retry count.
  error_attempts = 0

  # The degenerate-response recovery ladder (Slice 5). Fresh per call! so
  # its per-turn counters (prefill ≤2, empty ≤3) reset exactly where the
  # reference zeroes them on a successful content turn.
  recovery = recovery_for(iteration)

  # The live request we (re)issue. Rungs 3/4 mutate it: a nudge appends to
  # request.messages in place; a prefill re-issues with the seed attached.
  current = request
  # Visible text streamed to the user this call — fuels rung 1
  # (partial-stream recovery). The caller's block still sees every chunk.
  streamed = +""
  wrapped  = capture_streamed(streamed, &)

  # :recovered is thrown by the ladder's rung-1/2 ":use" directive — the
  # recovered final content, wrapped as a synthetic text response.
  catch(:recovered) do
    loop do
      @cancel_token&.check!

      begin
        response = active_llm.call(current, &wrapped)
      rescue Rubino::Interrupted
        # User cancellation propagates immediately — never classified, never
        # retried (the reference treats interrupt as terminal at every backoff site).
        raise
      rescue StandardError => e
        error_attempts = handle_error!(e, error_attempts, iteration)
        next
      end

      # User cancellation that arrived MID-STREAM may not surface as a raise:
      # once a chunk has flowed the adapter RETURNS the buffered (possibly
      # empty) partial instead of raising, so a Ctrl+C right as the stream
      # drained lands here as an "empty" response. Re-check the cancel token
      # BEFORE validation so the interrupt is terminal — otherwise the empty
      # partial is classified :empty_response and the recovery ladder prints
      # a spurious "Empty response — retrying (1/2)" before the cancel is
      # acknowledged (D4). The interrupt is the correct terminal outcome.
      @cancel_token&.check!

      ok, reason = @validator.valid?(response)

      # Structurally invalid AND not an empty turn (nil / interrupted
      # truncated-stream partial). SLICE-7 eager fallback:
      # an invalid/malformed response is a common rate-limit symptom, so
      # rotate to the next provider immediately rather than surfacing it as
      # a failed turn. On a switch, reset the per-call counters and retry on
      # the new adapter; otherwise hand it back untouched — the Loop maps it
      # to StreamInterruptedError. Not the recovery ladder's job.
      if !ok && reason != :empty_response
        if activate_fallback!(iteration)
          error_attempts = 0
          recovery = recovery_for(iteration)
          streamed.clear # partial belongs to the failed provider, not the new one
          next
        end
        throw(:recovered, response)
      end

      # Usable iff structurally valid AND not degenerate (thinking-only /
      # blank-after-think). A degenerate response passes #valid? (its content
      # is non-empty <think> text) but carries no real answer — route it, and
      # any 200-OK-but-empty turn, through the ladder.
      throw(:recovered, response) if ok && !@validator.degenerate?(response)

      current, switched = apply_recovery!(recovery, response, current, streamed, iteration)
      # SLICE-7 rung 6: the ladder rotated to a fallback. Reset
      # the per-call counters (fresh recovery, zeroed error budget) and retry
      # on the new adapter — the reference zeroes _empty_content_retries here.
      next unless switched

      error_attempts = 0
      recovery = recovery_for(iteration)
      streamed.clear
    end
  end
end