Class: Rubino::Jobs::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/jobs/queue.rb

Overview

Manages the job queue backed by SQLite. Supports enqueue, dequeue, locking, and status queries.

Instance Method Summary collapse

Constructor Details

#initialize(db: nil, config: nil) ⇒ Queue

Returns a new instance of Queue.



11
12
13
14
# File 'lib/rubino/jobs/queue.rb', line 11

def initialize(db: nil, config: nil)
  @db = db || Rubino.database.db
  @config = config || Rubino.configuration
end

Instance Method Details

#claim!(job_id, worker_id:) ⇒ Object

Atomically claims a single row by id: the SAME compare-and-swap lock #dequeue uses, transitioning queued → running only while the row is still ‘queued`. Returns true to exactly ONE caller; every concurrent claim (another process sharing this RUBINO_HOME, or a re-entrant reap) sees the row already `running` and gets false. The reaper (#346) claims through here before running each orphan, so two processes can never double-run (and double-bill) the same ExtractMemoryJob.



84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/rubino/jobs/queue.rb', line 84

def claim!(job_id, worker_id:) # rubocop:disable Naming/PredicateMethod -- a mutating CAS (bang), not a query; the boolean reports whether THIS caller won the lock
  now = Time.now.utc.iso8601
  updated = @db[:jobs]
            .where(id: job_id, status: "queued")
            .update(
              status: "running",
              locked_at: now,
              locked_by: worker_id,
              updated_at: now
            )
  updated.positive?
end

#cleanup!(older_than_days: 7) ⇒ Object

Cleans up old completed jobs



230
231
232
233
234
235
236
# File 'lib/rubino/jobs/queue.rb', line 230

def cleanup!(older_than_days: 7)
  cutoff = (Time.now - (older_than_days * 86_400)).utc.iso8601
  @db[:jobs]
    .where(status: "completed")
    .where { created_at < cutoff }
    .delete
end

#complete!(job_id) ⇒ Object

Marks a job as completed



98
99
100
101
102
103
104
105
# File 'lib/rubino/jobs/queue.rb', line 98

def complete!(job_id)
  @db[:jobs].where(id: job_id).update(
    status: "completed",
    locked_at: nil,
    locked_by: nil,
    updated_at: Time.now.utc.iso8601
  )
end

#countsObject

Status counts for the whole queue (status => count), one grouped query — the in-chat /jobs header line (#187). {} when the queue is empty.



159
160
161
# File 'lib/rubino/jobs/queue.rb', line 159

def counts
  @db[:jobs].group_and_count(:status).to_h { |row| [row[:status], row[:count]] }
end

#dequeue(worker_id:) ⇒ Object

Dequeues the next available job (locks it)



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/rubino/jobs/queue.rb', line 61

def dequeue(worker_id:)
  now = Time.now.utc.iso8601

  job = @db[:jobs]
        .where(status: "queued")
        .where { run_at <= now }
        .order(:priority, :run_at)
        .first

  return nil unless job

  return nil unless claim!(job[:id], worker_id: worker_id)

  @db[:jobs].where(id: job[:id]).first
end

#enqueue(type, payload, priority: 100, run_at: nil, drain_inline: true) ⇒ Object

Enqueues a new job.

drain_inline controls the inline-mode auto-execute: when true (the default) an inline-mode enqueue runs the job synchronously right here, as before. The post-turn polishing path (#319) passes drain_inline: false so the row is only PERSISTED — the detached Interaction::Polishing thread then drains it off the live turn’s critical path, so a slow/429 aux call can never block the next prompt.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/rubino/jobs/queue.rb', line 24

def enqueue(type, payload, priority: 100, run_at: nil, drain_inline: true)
  now = Time.now.utc.iso8601
  id = SecureRandom.uuid

  @db[:jobs].insert(
    id: id,
    type: type,
    status: "queued",
    priority: priority,
    payload_json: JSON.generate(payload),
    attempts: 0,
    max_attempts: @config.jobs_max_attempts,
    run_at: run_at || now,
    created_at: now,
    updated_at: now
  )

  return id unless drain_inline

  # If inline mode, execute immediately — but first drain any stale rows
  # a previous inline run left orphaned (#84/#224). Inline mode has no
  # background drainer, so a `queued` row whose enqueuing process was
  # interrupted mid-run (e.g. the user quit the session while the
  # post-turn extraction was still finishing) — run_job is called
  # directly, never locked, and Interrupt is not a StandardError, so the
  # row never reaches complete!/fail! — sits "queued" forever and is the
  # behaviour #84 closed. Every inline enqueue means a live process is
  # here and willing to drain, so reap those orphans on this boot.
  if @config.jobs_mode == "inline"
    reap_inline_orphans(before: id)
    Runner.new.run_job(id)
  end

  id
end

#fail!(job_id, error:) ⇒ Object

Marks a job as failed, increments attempts



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/rubino/jobs/queue.rb', line 108

def fail!(job_id, error:)
  job = @db[:jobs].where(id: job_id).first
  return unless job

  new_attempts = job[:attempts] + 1
  # Inline mode has no background drainer, so re-queueing a failed job
  # would leave it "queued" forever (#84) — mark it terminal ("failed")
  # instead so `jobs list` is honest. Worker/manual modes keep the
  # retry-with-backoff behavior until attempts are exhausted.
  new_status =
    if new_attempts >= job[:max_attempts]
      "dead"
    elsif @config.jobs_mode == "inline"
      "failed"
    else
      "queued"
    end

  # Calculate retry time with backoff
  backoff = @config.dig("jobs", "retry_backoff_seconds") || 30
  retry_at = (Time.now + (backoff * new_attempts)).utc.iso8601

  @db[:jobs].where(id: job_id).update(
    status: new_status,
    attempts: new_attempts,
    last_error: error,
    locked_at: nil,
    locked_by: nil,
    run_at: new_status == "queued" ? retry_at : job[:run_at],
    updated_at: Time.now.utc.iso8601
  )
end

#failed_countObject

Returns count of failed jobs — both the inline-mode terminal “failed” and the attempts-exhausted “dead” (the two states a human must act on; surfaced by the in-chat /status jobs line, #186).



171
172
173
# File 'lib/rubino/jobs/queue.rb', line 171

def failed_count
  @db[:jobs].where(status: %w[failed dead]).count
end

#find(id) ⇒ Object

Finds one job by full id or short-id prefix (the 8-char ids the list renders — same prefix resolution the memory store gives /memory show). nil when nothing matches; the first match wins on an ambiguous prefix.



151
152
153
154
155
# File 'lib/rubino/jobs/queue.rb', line 151

def find(id)
  return nil if id.to_s.empty?

  @db[:jobs].where(Sequel.like(:id, "#{id}%")).first
end

#list(status: nil, limit: 20) ⇒ Object

Lists jobs with optional filters



142
143
144
145
146
# File 'lib/rubino/jobs/queue.rb', line 142

def list(status: nil, limit: 20)
  dataset = @db[:jobs].order(Sequel.desc(:created_at)).limit(limit)
  dataset = dataset.where(status: status) if status
  dataset.all
end

#next_due_queuedObject

The next still-queued, due, UNLOCKED job row (no lock taken). Used by the detached post-turn polishing drain (#319), which runs each row through Jobs::Runner#run_job sequentially on its own thread — so it needs the next candidate, not a worker-locked claim. Returns nil when the queue has nothing due. Scanned fresh each call so rows a follow-up turn enqueues mid-drain are picked up by the same worker (coalescing).



220
221
222
223
224
225
226
227
# File 'lib/rubino/jobs/queue.rb', line 220

def next_due_queued
  now = Time.now.utc.iso8601
  @db[:jobs]
    .where(status: "queued", locked_by: nil)
    .where { run_at <= now }
    .order(:priority, :run_at)
    .first
end

#pending_countObject

Returns count of pending jobs



164
165
166
# File 'lib/rubino/jobs/queue.rb', line 164

def pending_count
  @db[:jobs].where(status: "queued").count
end

#reap_inline_orphans(before: nil) ⇒ Object

Drains ‘queued` rows left orphaned by an interrupted prior inline run (#84/#224). Runs every still-queued, due, unlocked row that was enqueued before before (the row this enqueue is about to run itself), so a turn whose extraction was interrupted is recovered on the next inline boot instead of sitting “queued” forever. Each is taken through run_job, which marks it completed / failed (inline) / dead terminally.



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/rubino/jobs/queue.rb', line 181

def reap_inline_orphans(before: nil)
  now = Time.now.utc.iso8601
  runner = Runner.new(db: @db)
  worker_id = "reap-#{Process.pid}"

  dataset = @db[:jobs]
            .where(status: "queued", locked_by: nil)
            .where { run_at <= now }
            .order(:priority, :run_at)
  dataset = dataset.exclude(id: before) if before

  # Isolate each orphan: run_job already failure-isolates a bad row
  # terminally, but a defence-in-depth guard here means even an
  # unexpected raise (e.g. a DB error draining one row) can NEVER abort
  # the live turn that is enqueuing — mirrors the poison-row defence
  # Scheduler#schedule has for unparseable cron rows (#J1).
  #
  # CAS-claim each orphan through the SAME lock #dequeue uses BEFORE
  # running it (#346). Two processes sharing one RUBINO_HOME used to both
  # see the same queued orphans and run_job them directly — no lock, no
  # terminal re-check — double-running (and double-billing) the aux work.
  # The atomic claim transitions queued → running for exactly one caller;
  # a row another process already grabbed returns false and is skipped.
  dataset.select_map(:id).each do |orphan_id|
    next unless claim!(orphan_id, worker_id: worker_id)

    runner.run_job(orphan_id)
  rescue StandardError => e
    Rubino.logger.warn(event: "jobs.reap_orphan_failed", job_id: orphan_id, error: e.class.name,
                       message: e.message)
  end
end