Class: Rubino::Run::Repository
- Inherits:
-
Object
- Object
- Rubino::Run::Repository
- Defined in:
- lib/rubino/run/repository.rb
Overview
Repository for Run CRUD. A Run is one user-input -> assistant-response cycle within a Session, exposed as a first-class resource over the HTTP API and the only persistence point for cooperative cancellation.
Status transitions are driven by the executor:
queued -> running (#mark_running!)
-> completed (#mark_completed!)
-> failed (#mark_failed!)
-> stopped (#mark_stopped!)
Cooperative stop pattern:
- +POST /v1/runs/:id/stop+ calls #request_stop! which flips the
+stop_requested+ boolean on the row.
- The run loop is expected to poll #stop_requested? between turns
and bail out, then call #mark_stopped!. The flag is a hint, not
a hard kill — the worker thread keeps the CPU until it observes
it. In the current Executor the in-loop poll is not yet wired,
so the flag is recorded and surfaced to clients but does not
actually halt an in-flight run; downstream agents should add the
check inside Agent::Runner.
last_for_session uses a (created_at DESC, rowid DESC) tuple to disambiguate rows created in the same second.
Instance Method Summary collapse
- #create(session_id:, input_text:, attachments: [], skills: [], model: nil, provider: nil, cron_job_id: nil) ⇒ Object
-
#destroy!(id) ⇒ Object
Cascades: deletes the run’s persisted events before the run row, in a single transaction (FKs are not declared at the schema level).
- #find(id) ⇒ Object
-
#initialize(db: nil) ⇒ Repository
constructor
A new instance of Repository.
- #last_for_session(session_id) ⇒ Object
- #list_for_session(session_id) ⇒ Object
- #mark_completed!(id, tokens_input: 0, tokens_output: 0) ⇒ Object
- #mark_failed!(id, error:) ⇒ Object
- #mark_running!(id) ⇒ Object
- #mark_stopped!(id) ⇒ Object
-
#request_stop!(id) ⇒ Object
Signals a cooperative stop.
- #stop_requested?(id) ⇒ Boolean
Constructor Details
#initialize(db: nil) ⇒ Repository
Returns a new instance of Repository.
32 33 34 |
# File 'lib/rubino/run/repository.rb', line 32 def initialize(db: nil) @db = db || Rubino.database.db end |
Instance Method Details
#create(session_id:, input_text:, attachments: [], skills: [], model: nil, provider: nil, cron_job_id: nil) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/rubino/run/repository.rb', line 36 def create(session_id:, input_text:, attachments: [], skills: [], model: nil, provider: nil, cron_job_id: nil) now = Time.now.utc.iso8601 id = SecureRandom.uuid @db[:runs].insert( id: id, session_id: session_id, status: "queued", input_text: input_text, attachments_json: JSON.generate(), skills_json: JSON.generate(skills), model: model, provider: provider, cron_job_id: cron_job_id, stop_requested: false, created_at: now, updated_at: now ) find(id) end |
#destroy!(id) ⇒ Object
Cascades: deletes the run’s persisted events before the run row, in a single transaction (FKs are not declared at the schema level).
110 111 112 113 114 115 |
# File 'lib/rubino/run/repository.rb', line 110 def destroy!(id) @db.transaction do @db[:events].where(run_id: id).delete @db[:runs].where(id: id).delete end end |
#find(id) ⇒ Object
57 58 59 |
# File 'lib/rubino/run/repository.rb', line 57 def find(id) @db[:runs].where(id: id).first end |
#last_for_session(session_id) ⇒ Object
65 66 67 68 69 70 |
# File 'lib/rubino/run/repository.rb', line 65 def last_for_session(session_id) @db[:runs] .where(session_id: session_id) .order(Sequel.desc(:created_at), Sequel.desc(Sequel.lit("rowid"))) .first end |
#list_for_session(session_id) ⇒ Object
61 62 63 |
# File 'lib/rubino/run/repository.rb', line 61 def list_for_session(session_id) @db[:runs].where(session_id: session_id).order(:created_at).all end |
#mark_completed!(id, tokens_input: 0, tokens_output: 0) ⇒ Object
77 78 79 80 81 82 83 84 85 86 |
# File 'lib/rubino/run/repository.rb', line 77 def mark_completed!(id, tokens_input: 0, tokens_output: 0) now = Time.now.utc.iso8601 @db[:runs].where(id: id).update( status: "completed", finished_at: now, tokens_input: tokens_input, tokens_output: tokens_output, updated_at: now ) end |
#mark_failed!(id, error:) ⇒ Object
88 89 90 91 |
# File 'lib/rubino/run/repository.rb', line 88 def mark_failed!(id, error:) now = Time.now.utc.iso8601 @db[:runs].where(id: id).update(status: "failed", error: error, finished_at: now, updated_at: now) end |
#mark_running!(id) ⇒ Object
72 73 74 75 |
# File 'lib/rubino/run/repository.rb', line 72 def mark_running!(id) now = Time.now.utc.iso8601 @db[:runs].where(id: id).update(status: "running", started_at: now, updated_at: now) end |
#mark_stopped!(id) ⇒ Object
93 94 95 96 |
# File 'lib/rubino/run/repository.rb', line 93 def mark_stopped!(id) now = Time.now.utc.iso8601 @db[:runs].where(id: id).update(status: "stopped", finished_at: now, updated_at: now) end |
#request_stop!(id) ⇒ Object
Signals a cooperative stop. The run loop must observe this on its own; nothing in this class interrupts an in-flight thread.
100 101 102 |
# File 'lib/rubino/run/repository.rb', line 100 def request_stop!(id) @db[:runs].where(id: id).update(stop_requested: true, updated_at: Time.now.utc.iso8601) end |
#stop_requested?(id) ⇒ Boolean
104 105 106 |
# File 'lib/rubino/run/repository.rb', line 104 def stop_requested?(id) @db[:runs].where(id: id).get(:stop_requested) == true end |