Class: Rubino::Run::Repository
- Inherits:
-
Object
- Object
- Rubino::Run::Repository
- Defined in:
- lib/rubino/run/repository.rb
Overview
Repository for Run CRUD. A Run is one user-input -> assistant-response cycle within a Session, exposed as a first-class resource over the HTTP API and the only persistence point for cooperative cancellation.
Status transitions are driven by the executor:
queued -> running (#mark_running!)
-> completed (#mark_completed!)
-> failed (#mark_failed!)
-> stopped (#mark_stopped!)
Cooperative stop pattern:
- +POST /v1/runs/:id/stop+ calls #request_stop! which flips the
+stop_requested+ boolean on the row.
- The run loop is expected to poll #stop_requested? between turns
and bail out, then call #mark_stopped!. The flag is a hint, not
a hard kill — the worker thread keeps the CPU until it observes
it. In the current Executor the in-loop poll is not yet wired,
so the flag is recorded and surfaced to clients but does not
actually halt an in-flight run; downstream agents should add the
check inside Agent::Runner.
Instance Method Summary collapse
- #create(session_id:, input_text:, attachments: [], skills: [], model: nil, provider: nil, cron_job_id: nil) ⇒ Object
-
#destroy!(id) ⇒ Object
Cascades: deletes the run’s persisted events before the run row, in a single transaction (FKs are not declared at the schema level).
- #find(id) ⇒ Object
-
#initialize(db: nil) ⇒ Repository
constructor
A new instance of Repository.
- #mark_completed!(id, tokens_input: 0, tokens_output: 0) ⇒ Object
- #mark_failed!(id, error:) ⇒ Object
- #mark_running!(id) ⇒ Object
- #mark_stopped!(id) ⇒ Object
-
#request_stop!(id) ⇒ Object
Signals a cooperative stop.
- #stop_requested?(id) ⇒ Boolean
Constructor Details
#initialize(db: nil) ⇒ Repository
Returns a new instance of Repository.
29 30 31 |
# File 'lib/rubino/run/repository.rb', line 29 def initialize(db: nil) @db = db || Rubino.database.db end |
Instance Method Details
#create(session_id:, input_text:, attachments: [], skills: [], model: nil, provider: nil, cron_job_id: nil) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/rubino/run/repository.rb', line 33 def create(session_id:, input_text:, attachments: [], skills: [], model: nil, provider: nil, cron_job_id: nil) now = Time.now.utc.iso8601 id = SecureRandom.uuid @db[:runs].insert( id: id, session_id: session_id, status: "queued", input_text: input_text, attachments_json: JSON.generate(), skills_json: JSON.generate(skills), model: model, provider: provider, cron_job_id: cron_job_id, stop_requested: false, created_at: now, updated_at: now ) find(id) end |
#destroy!(id) ⇒ Object
Cascades: deletes the run’s persisted events before the run row, in a single transaction (FKs are not declared at the schema level).
96 97 98 99 100 101 |
# File 'lib/rubino/run/repository.rb', line 96 def destroy!(id) @db.transaction do @db[:events].where(run_id: id).delete @db[:runs].where(id: id).delete end end |
#find(id) ⇒ Object
54 55 56 |
# File 'lib/rubino/run/repository.rb', line 54 def find(id) @db[:runs].where(id: id).first end |
#mark_completed!(id, tokens_input: 0, tokens_output: 0) ⇒ Object
63 64 65 66 67 68 69 70 71 72 |
# File 'lib/rubino/run/repository.rb', line 63 def mark_completed!(id, tokens_input: 0, tokens_output: 0) now = Time.now.utc.iso8601 @db[:runs].where(id: id).update( status: "completed", finished_at: now, tokens_input: tokens_input, tokens_output: tokens_output, updated_at: now ) end |
#mark_failed!(id, error:) ⇒ Object
74 75 76 77 |
# File 'lib/rubino/run/repository.rb', line 74 def mark_failed!(id, error:) now = Time.now.utc.iso8601 @db[:runs].where(id: id).update(status: "failed", error: error, finished_at: now, updated_at: now) end |
#mark_running!(id) ⇒ Object
58 59 60 61 |
# File 'lib/rubino/run/repository.rb', line 58 def mark_running!(id) now = Time.now.utc.iso8601 @db[:runs].where(id: id).update(status: "running", started_at: now, updated_at: now) end |
#mark_stopped!(id) ⇒ Object
79 80 81 82 |
# File 'lib/rubino/run/repository.rb', line 79 def mark_stopped!(id) now = Time.now.utc.iso8601 @db[:runs].where(id: id).update(status: "stopped", finished_at: now, updated_at: now) end |
#request_stop!(id) ⇒ Object
Signals a cooperative stop. The run loop must observe this on its own; nothing in this class interrupts an in-flight thread.
86 87 88 |
# File 'lib/rubino/run/repository.rb', line 86 def request_stop!(id) @db[:runs].where(id: id).update(stop_requested: true, updated_at: Time.now.utc.iso8601) end |
#stop_requested?(id) ⇒ Boolean
90 91 92 |
# File 'lib/rubino/run/repository.rb', line 90 def stop_requested?(id) @db[:runs].where(id: id).get(:stop_requested) == true end |