Class: Rubino::Run::Repository

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/run/repository.rb

Overview

Repository for Run CRUD. A Run is one user-input -> assistant-response cycle within a Session, exposed as a first-class resource over the HTTP API and the only persistence point for cooperative cancellation.

Status transitions are driven by the executor:

queued -> running (#mark_running!)
       -> completed (#mark_completed!)
       -> failed    (#mark_failed!)
       -> stopped   (#mark_stopped!)

Cooperative stop pattern:

- +POST /v1/runs/:id/stop+ calls #request_stop! which flips the
  +stop_requested+ boolean on the row.
- The run loop is expected to poll #stop_requested? between turns
  and bail out, then call #mark_stopped!. The flag is a hint, not
  a hard kill — the worker thread keeps the CPU until it observes
  it. In the current Executor the in-loop poll is not yet wired,
  so the flag is recorded and surfaced to clients but does not
  actually halt an in-flight run; downstream agents should add the
  check inside Agent::Runner.

Instance Method Summary collapse

Constructor Details

#initialize(db: nil) ⇒ Repository

Returns a new instance of Repository.



29
30
31
# File 'lib/rubino/run/repository.rb', line 29

def initialize(db: nil)
  @db = db || Rubino.database.db
end

Instance Method Details

#create(session_id:, input_text:, attachments: [], skills: [], model: nil, provider: nil, cron_job_id: nil) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/rubino/run/repository.rb', line 33

def create(session_id:, input_text:, attachments: [], skills: [], model: nil, provider: nil, cron_job_id: nil)
  now = Time.now.utc.iso8601
  id = SecureRandom.uuid

  @db[:runs].insert(
    id: id,
    session_id: session_id,
    status: "queued",
    input_text: input_text,
    attachments_json: JSON.generate(attachments),
    skills_json: JSON.generate(skills),
    model: model,
    provider: provider,
    cron_job_id: cron_job_id,
    stop_requested: false,
    created_at: now,
    updated_at: now
  )
  find(id)
end

#destroy!(id) ⇒ Object

Cascades: deletes the run’s persisted events before the run row, in a single transaction (FKs are not declared at the schema level).



96
97
98
99
100
101
# File 'lib/rubino/run/repository.rb', line 96

def destroy!(id)
  @db.transaction do
    @db[:events].where(run_id: id).delete
    @db[:runs].where(id: id).delete
  end
end

#find(id) ⇒ Object



54
55
56
# File 'lib/rubino/run/repository.rb', line 54

def find(id)
  @db[:runs].where(id: id).first
end

#mark_completed!(id, tokens_input: 0, tokens_output: 0) ⇒ Object



63
64
65
66
67
68
69
70
71
72
# File 'lib/rubino/run/repository.rb', line 63

def mark_completed!(id, tokens_input: 0, tokens_output: 0)
  now = Time.now.utc.iso8601
  @db[:runs].where(id: id).update(
    status: "completed",
    finished_at: now,
    tokens_input: tokens_input,
    tokens_output: tokens_output,
    updated_at: now
  )
end

#mark_failed!(id, error:) ⇒ Object



74
75
76
77
# File 'lib/rubino/run/repository.rb', line 74

def mark_failed!(id, error:)
  now = Time.now.utc.iso8601
  @db[:runs].where(id: id).update(status: "failed", error: error, finished_at: now, updated_at: now)
end

#mark_running!(id) ⇒ Object



58
59
60
61
# File 'lib/rubino/run/repository.rb', line 58

def mark_running!(id)
  now = Time.now.utc.iso8601
  @db[:runs].where(id: id).update(status: "running", started_at: now, updated_at: now)
end

#mark_stopped!(id) ⇒ Object



79
80
81
82
# File 'lib/rubino/run/repository.rb', line 79

def mark_stopped!(id)
  now = Time.now.utc.iso8601
  @db[:runs].where(id: id).update(status: "stopped", finished_at: now, updated_at: now)
end

#request_stop!(id) ⇒ Object

Signals a cooperative stop. The run loop must observe this on its own; nothing in this class interrupts an in-flight thread.



86
87
88
# File 'lib/rubino/run/repository.rb', line 86

def request_stop!(id)
  @db[:runs].where(id: id).update(stop_requested: true, updated_at: Time.now.utc.iso8601)
end

#stop_requested?(id) ⇒ Boolean

Returns:

  • (Boolean)


90
91
92
# File 'lib/rubino/run/repository.rb', line 90

def stop_requested?(id)
  @db[:runs].where(id: id).get(:stop_requested) == true
end