Class: Rubino::Run::Executor

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/run/executor.rb

Overview

Runs an Agent::Runner in a background thread, persisting per-run events via Recorder. Returns immediately so the HTTP handler can respond 201.

Per-run wiring done inside the spawned thread:

- Recorder.attach! to mirror EventBus into EventStore + live queue.
- A fresh ApprovalGate per run, published in GateRegistry under run_id
  so HTTP decision endpoints can resolve it.
- UI::API instantiated with the gate and recorder so the runner can
  ask for approvals / clarifications.
- +ensure+ block always detaches the recorder, unregisters the gate,
  and fires the +on_complete+ callback (used by Jobs::Scheduler to
  trigger webhook delivery).

Metrics: runs_total is incremented once per #start (tagged with source, defaulting to “api”); runs_completed_total is incremented in the ensure block, tagged with the final status (completed or failed).

Stop is cooperative via Run::Repository#stop_requested?. The worker spawns a short-tick watcher (#spawn_stop_watcher) that polls that flag and, on observing it, flips the runner’s CancelToken via runner.cancel!. The token is the single halt mechanism: the agent loop / LLM stream poll it (CancelToken#check!) and raise Interrupted, which unwinds the turn the same way a chat Ctrl+C does. No second kill path.

Constant Summary collapse

STOP_POLL_INTERVAL =

How often the stop watcher polls the DB stop flag (seconds).

0.25
VISION_ANALYSIS_PROMPT =

Prompt sent to the auxiliary vision model when pre-describing an image for a text-only primary. Verbatim from the reference — broad enough that the description is useful regardless of the user’s question.

"Describe everything visible in this image in thorough detail. " \
"Include any text, code, data, objects, people, layout, colors, " \
"and any other notable visual information."

Instance Method Summary collapse

Constructor Details

#initialize(repository: nil, recorder_factory: nil, vision_describer: nil) ⇒ Executor

Returns a new instance of Executor.



45
46
47
48
49
50
51
52
53
54
# File 'lib/rubino/run/executor.rb', line 45

def initialize(repository: nil, recorder_factory: nil, vision_describer: nil)
  @repository = repository || Repository.new
  @recorder_factory = recorder_factory ||
                      lambda { |run_id:, session_id:, event_bus:|
                        Recorder.new(run_id: run_id, session_id: session_id, event_bus: event_bus)
                      }
  # Callable(path) -> description String (or an "Error…" String on
  # failure). Injectable so unit tests don't hit the aux model.
  @vision_describer = vision_describer || method(:default_vision_describe)
end

Instance Method Details

#attachment_block(path) ⇒ Object

Classifies a non-image attachment by content (Attachments::Classify – magic wins, fail-closed safety pipeline) and renders the typed preamble (Attachments::Preamble). Returns nil to SKIP the attachment (with a warn) when the safety pipeline rejects it or its kind is disallowed by policy – never inline/execute an unsafe file. Closes Gap B (archives / documents / binaries get typed guidance instead of a bare ‘- file:`).



150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/rubino/run/executor.rb', line 150

def attachment_block(path)
  cls = Attachments::Classify.call(path)
  unless cls.safe
    Rubino.logger.warn(event: "run.attachment_skipped", path: path.to_s, reason: cls.reason)
    return nil
  end
  unless Attachments::Policy.allow_kind?(cls.kind)
    Rubino.logger.warn(event: "run.attachment_skipped", path: path.to_s,
                       reason: "kind #{cls.kind} not in allow_kinds")
    return nil
  end
  Attachments::Preamble.for(cls)
end

#augment_input_with_attachments(input_text, paths, native_image_paths: [], descriptions: {}) ⇒ Object

Pre-pends an “you have these local files” header to the user input so the model knows the attachments are on disk and doesn’t try to webfetch them (binaries crash webfetch — v0.2.5). Pure function (no network) —any vision pre-description is computed upstream and passed in via descriptions. Putting the header FIRST anchors small models (MiniMax in particular) — a trailing block was ignored in prod session 33.

Per file, mirroring the reference image-routing logic:

- image sent natively (primary sees pixels): a [Image attached at: …]
  handle so the model can reference it in follow-up tool calls.
- image on a text-only primary WITH a pre-description: inline the
  description so the model has the content without having to choose
  to call a tool (the prod failure mode — M2.7 said "no image" / ran
  `shell ls`). +descriptions+ maps such paths to their aux output.
- image on a text-only primary WITHOUT a description (aux missing or
  errored): an explicit imperative to call `vision`, not shell.
- non-image file: generic pointer; the preamble (PDF → markitdown)
  and tool descriptions tell the model which tool fits.


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/rubino/run/executor.rb', line 86

def augment_input_with_attachments(input_text, paths, native_image_paths: [], descriptions: {})
  return input_text.to_s if paths.nil? || paths.empty?

  native = Array(native_image_paths)
  user_text = input_text.to_s
  if user_text.strip.empty? && paths.any? { |p| LLM::ContentBuilder.image_file?(p) }
    user_text = "What do you see in this image?"
  end

  aux_vision = !Rubino.configuration.auxiliary_vision_config["model"].to_s.empty?
  blocks = paths.filter_map do |p|
    if LLM::ContentBuilder.image_file?(p) && image_by_magic?(p)
      if native.include?(p)
        "[Image attached at: #{p}]"
      elsif descriptions[p]
        "[The user attached an image. Here's what it contains:\n#{descriptions[p]}]\n" \
          "[If you need a closer look, call the `vision` tool with file_path: #{p}.]"
      elsif aux_vision
        # Aux configured but pre-description failed: keep the on-demand
        # `vision` imperative (the tool stays exposed in this case).
        "[The user attached an image at #{p}. Call the `vision` tool with " \
          "file_path: #{p} to see it — do not use shell/ls.]"
      else
        # Gap A: no native vision, no aux vision => the `vision` tool is
        # HIDDEN from the model (Registry#aux_dependency_satisfied?). Do
        # NOT instruct calling a hidden tool; warn instead.
        cls = Attachments::Classify.call(p)
        Attachments::Preamble.no_multimodal_warning(p, cls.mime || "image")
      end
    else
      # Gap B + universal handling + MIME-spoof egress guard: classify by
      # content (magic wins) and render a typed preamble. A file with an
      # image extension whose magic is NOT an image (e.g. a .png-named
      # zip) lands here too via #image_by_magic? above, so it is demoted
      # to its real kind instead of being shipped to native/aux vision.
      # Unsafe/oversize/disallowed => skip+warn.
      attachment_block(p)
    end
  end

  "[Uploaded files — already in your workspace. Do not re-fetch the URLs.]\n" \
    "#{blocks.join("\n\n")}\n\n" \
    "#{user_text}"
end

#image_by_magic?(path) ⇒ Boolean

True only when a file that LOOKS like an image by extension ALSO sniffs as a real image by content (Attachments::Classify, magic wins). Gates the native/aux-vision egress branch: a .png-named zip/text/binary fails here and is demoted to attachment_block (its real typed preamble), so a spoofed extension can never ship raw bytes to the native vision model or the EXTERNAL auxiliary vision model. The safety pipeline (lstat / realpath-confine / size cap) runs inside Classify, so image-extension files now get the same fail-closed checks as every other attachment.

Returns:

  • (Boolean)


139
140
141
142
# File 'lib/rubino/run/executor.rb', line 139

def image_by_magic?(path)
  cls = Attachments::Classify.call(path)
  cls.safe && cls.kind == :image
end

#parse_attachment_urls(attachments_json) ⇒ Object

Parses Run row’s persisted attachments_json column (a JSON array of URL strings as sent on the CreateRun body). Returns [] on any malformed input so a broken attachment list never blocks the run.



59
60
61
62
63
64
65
66
# File 'lib/rubino/run/executor.rb', line 59

def parse_attachment_urls(attachments_json)
  return [] if attachments_json.nil? || attachments_json.to_s.empty?

  parsed = JSON.parse(attachments_json)
  parsed.is_a?(Array) ? parsed : []
rescue JSON::ParserError
  []
end

#start(run, on_complete: nil) ⇒ Thread

Spawns the worker thread and returns it immediately.

Parameters:

  • run (Hash)

    row from Run::Repository; :id, :session_id, :input_text, :model, :provider are read.

  • on_complete (#call, nil) (defaults to: nil)

    invoked from the ensure block with run_id:, session_id:, status:; runs even when the run failed.

Returns:

  • (Thread)

    the worker thread (caller typically discards it).



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/rubino/run/executor.rb', line 170

def start(run, on_complete: nil)
  Thread.new do
    run_id = run[:id]
    session_id = run[:session_id]
    # A FRESH bus per run is the isolation boundary: the Recorder and the
    # Runner share THIS instance only, so a run's emit reaches only its own
    # recorder and its detach!/off only removes its own listeners. Without
    # it every run bound the process-global bus and cross-contaminated
    # peers' events/output (architecture audit A1).
    bus = Interaction::EventBus.new
    recorder = @recorder_factory.call(run_id: run_id, session_id: session_id, event_bus: bus)
    gate = ApprovalGate.new
    GateRegistry.register(run_id, gate)
    recorder.attach!
    final_status = "completed"
    stopped      = false
    stop_watcher = nil
    ::Rubino::Metrics.counter(:runs_total, source: run[:source] || "api").increment
    begin
      @repository.mark_running!(run_id)
      # Bind this run's gated UI as the thread-scoped Rubino.ui for the
      # whole worker thread, so tools that look up the global adapter
      # (QuestionTool#ask → clarify.required, TaskTool) hit THIS run's
      # gate/recorder instead of the gate-less process global — without it
      # the `question` tool's prompt is silently dropped and the web run
      # hangs on an unanswerable question.
      ui = UI::API.new(gate: gate, recorder: recorder, session_id: session_id)
      runner = Agent::Runner.new(
        session_id: session_id,
        model_override: run[:model],
        provider_override: run[:provider],
        ui: ui,
        event_bus: bus
      )
      # Bridge the cooperative HTTP stop flag to the runner's cancel
      # token: poll #stop_requested? on a short tick and flip the token
      # so the in-flight loop/stream unwinds via Interrupted. The flag in
      # the closure lets the ensure record the run as "stopped" rather
      # than "completed"/"failed".
      stop_watcher = spawn_stop_watcher(run_id, runner) { stopped = true }
      # Agent::Runner swallows Interrupted and StandardError internally
      # and emits INTERACTION_FAILED on the bus, which Recorder maps to
      # "run.failed". The lifecycle emits INTERACTION_FINISHED on the
      # happy path → "run.completed". Don't re-emit either terminal
      # event here or every run would broadcast two terminal frames
      # (and the web UI would enqueue two title-generation jobs).
      downloaded_paths = AttachmentDownloader.new.fetch_all(
        run_id: run_id,
        urls: parse_attachment_urls(run[:attachments_json])
      )
      # Emit a recorded event so SSE consumers (and post-hoc forensics)
      # can confirm the augment fired and which paths the model saw.
      # Only when something was actually downloaded — a plain chat with
      # no upload has nothing to report, and emitting an empty event just
      # rendered as noise in the timeline. Direct recorder.emit bypasses
      # EventBus, same pattern as approval.required.
      recorder.emit("run.attachments_downloaded", paths: downloaded_paths) if downloaded_paths.any?
      # When the primary model supports vision, image files are passed
      # natively (via ruby_llm `with:`) so the model can ingest the bytes
      # directly. When the primary is text-only, image_paths stays empty
      # and we pre-describe each image with the vision aux NOW, inlining
      # the description into the prompt — so the model has the content
      # without depending on choosing to call the `vision` tool (the prod
      # failure mode in sessions 36/37). The tool stays exposed for
      # on-demand re-inspection either way. Mirrors the reference text-mode
      # _enrich_message_with_vision.
      image_paths_for_native = native_image_paths(downloaded_paths)
      descriptions = preprocess_images_with_vision(
        downloaded_paths, image_paths_for_native, recorder
      )
      Rubino.with_ui(ui) do
        runner.run!(
          augment_input_with_attachments(
            run[:input_text], downloaded_paths,
            native_image_paths: image_paths_for_native,
            descriptions: descriptions
          ),
          image_paths: image_paths_for_native
        )
      end
      @repository.mark_completed!(run_id)
    rescue Rubino::Interrupted
      # Cooperative stop won the race: the watcher flipped the token and
      # the loop unwound via Interrupted. Record "stopped", not "failed"
      # — this was a user-requested halt, not an error. Re-raise to a
      # failed terminal state only if the token flipped for some other
      # reason than a stop request (shouldn't happen in the API path).
      if stopped || @repository.stop_requested?(run_id)
        final_status = "stopped"
        @repository.mark_stopped!(run_id)
        recorder.emit("run.stopped", {})
      else
        final_status = "failed"
        safe_mark_failed(run_id, "interrupted")
        safe_emit_failed(recorder, "interrupted")
      end
    rescue SystemExit, Interrupt, SignalException
      # Process is shutting down — re-raise so systemd / Puma can drain.
      # Mark the run as failed first so it isn't left stuck in "running".
      final_status = "failed"
      safe_mark_failed(run_id, "agent process terminated")
      safe_emit_failed(recorder, "agent process terminated")
      raise
    rescue Exception => e # rubocop:disable Lint/RescueException
      # Catch Exception (not just StandardError) — user-tool LoadError /
      # SyntaxError / NoMemoryError can propagate from threads inside the
      # runner via Thread#join, and without this the worker silently dies
      # and the run is left as "running" forever (the recorder never sees
      # INTERACTION_FAILED so the SSE stream also never gets a terminal
      # frame). Emit run.failed directly via the recorder as a safety net
      # in case the lifecycle didn't get a chance to.
      final_status = "failed"
      Rubino.logger.error(event: "run.exception", run_id: run_id, error: e.class.name, message: e.message)
      safe_mark_failed(run_id, "#{e.class}: #{e.message}")
      safe_emit_failed(recorder, "#{e.class}: #{e.message}")
    ensure
      stop_watcher&.kill
      recorder.detach!
      GateRegistry.unregister(run_id)
      ::Rubino::Metrics.counter(:runs_completed_total, status: final_status).increment
      on_complete&.call(run_id: run_id, session_id: session_id, status: final_status)
    end
  end
end