Class: Rubino::Jobs::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/jobs/queue.rb

Overview

Manages the job queue backed by SQLite. Supports enqueue, dequeue, locking, and status queries.

Instance Method Summary collapse

Constructor Details

#initialize(db: nil, config: nil) ⇒ Queue

Returns a new instance of Queue.



11
12
13
14
# File 'lib/rubino/jobs/queue.rb', line 11

def initialize(db: nil, config: nil)
  @db = db || Rubino.database.db
  @config = config || Rubino.configuration
end

Instance Method Details

#claim!(job_id, worker_id:) ⇒ Object

Atomically claims a single row by id: the SAME compare-and-swap lock #dequeue uses, transitioning queued → running only while the row is still ‘queued`. Returns true to exactly ONE caller; every concurrent claim (another process sharing this RUBINO_HOME, or a re-entrant reap) sees the row already `running` and gets false. The reaper (#346) claims through here before running each orphan, so two processes can never double-run (and double-bill) the same ExtractMemoryJob.



84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/rubino/jobs/queue.rb', line 84

def claim!(job_id, worker_id:) # rubocop:disable Naming/PredicateMethod -- a mutating CAS (bang), not a query; the boolean reports whether THIS caller won the lock
  now = Time.now.utc.iso8601
  updated = @db[:jobs]
            .where(id: job_id, status: "queued")
            .update(
              status: "running",
              locked_at: now,
              locked_by: worker_id,
              updated_at: now
            )
  updated.positive?
end

#cleanup!(older_than_days: 7) ⇒ Object

Cleans up old completed jobs



295
296
297
298
299
300
301
# File 'lib/rubino/jobs/queue.rb', line 295

def cleanup!(older_than_days: 7)
  cutoff = (Time.now - (older_than_days * 86_400)).utc.iso8601
  @db[:jobs]
    .where(status: "completed")
    .where { created_at < cutoff }
    .delete
end

#complete!(job_id) ⇒ Object

Marks a job as completed



98
99
100
101
102
103
104
105
# File 'lib/rubino/jobs/queue.rb', line 98

def complete!(job_id)
  @db[:jobs].where(id: job_id).update(
    status: "completed",
    locked_at: nil,
    locked_by: nil,
    updated_at: Time.now.utc.iso8601
  )
end

#countsObject

Status counts for the whole queue (status => count), one grouped query — the in-chat /jobs header line (#187). {} when the queue is empty. Reclaims lease-expired ‘running` rows first (WHATIF-headless YELLOW-1) so the header count matches the reclaimed list rendered right after it.



168
169
170
171
# File 'lib/rubino/jobs/queue.rb', line 168

def counts
  reclaim_stale!
  @db[:jobs].group_and_count(:status).to_h { |row| [row[:status], row[:count]] }
end

#dequeue(worker_id:) ⇒ Object

Dequeues the next available job (locks it)



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/rubino/jobs/queue.rb', line 61

def dequeue(worker_id:)
  now = Time.now.utc.iso8601

  job = @db[:jobs]
        .where(status: "queued")
        .where { run_at <= now }
        .order(:priority, :run_at)
        .first

  return nil unless job

  return nil unless claim!(job[:id], worker_id: worker_id)

  @db[:jobs].where(id: job[:id]).first
end

#enqueue(type, payload, priority: 100, run_at: nil, drain_inline: true) ⇒ Object

Enqueues a new job.

drain_inline controls the inline-mode auto-execute: when true (the default) an inline-mode enqueue runs the job synchronously right here, as before. The post-turn polishing path (#319) passes drain_inline: false so the row is only PERSISTED — the detached Interaction::Polishing thread then drains it off the live turn’s critical path, so a slow/429 aux call can never block the next prompt.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/rubino/jobs/queue.rb', line 24

def enqueue(type, payload, priority: 100, run_at: nil, drain_inline: true)
  now = Time.now.utc.iso8601
  id = SecureRandom.uuid

  @db[:jobs].insert(
    id: id,
    type: type,
    status: "queued",
    priority: priority,
    payload_json: JSON.generate(payload),
    attempts: 0,
    max_attempts: @config.dig("jobs", "max_attempts"),
    run_at: run_at || now,
    created_at: now,
    updated_at: now
  )

  return id unless drain_inline

  # If inline mode, execute immediately — but first drain any stale rows
  # a previous inline run left orphaned (#84/#224). Inline mode has no
  # background drainer, so a `queued` row whose enqueuing process was
  # interrupted mid-run (e.g. the user quit the session while the
  # post-turn extraction was still finishing) — run_job is called
  # directly, never locked, and Interrupt is not a StandardError, so the
  # row never reaches complete!/fail! — sits "queued" forever and is the
  # behaviour #84 closed. Every inline enqueue means a live process is
  # here and willing to drain, so reap those orphans on this boot.
  if @config.dig("jobs", "mode") == "inline"
    reap_inline_orphans(before: id)
    Runner.new.run_job(id)
  end

  id
end

#fail!(job_id, error:) ⇒ Object

Marks a job as failed, increments attempts



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/rubino/jobs/queue.rb', line 108

def fail!(job_id, error:)
  job = @db[:jobs].where(id: job_id).first
  return unless job

  new_attempts = job[:attempts] + 1
  # Inline mode has no background drainer, so re-queueing a failed job
  # would leave it "queued" forever (#84) — mark it terminal ("failed")
  # instead so `jobs list` is honest. Worker/manual modes keep the
  # retry-with-backoff behavior until attempts are exhausted.
  new_status =
    if new_attempts >= job[:max_attempts]
      "dead"
    elsif @config.dig("jobs", "mode") == "inline"
      "failed"
    else
      "queued"
    end

  # Calculate retry time with backoff
  backoff = @config.dig("jobs", "retry_backoff_seconds") || 30
  retry_at = (Time.now + (backoff * new_attempts)).utc.iso8601

  @db[:jobs].where(id: job_id).update(
    status: new_status,
    attempts: new_attempts,
    last_error: error,
    locked_at: nil,
    locked_by: nil,
    run_at: new_status == "queued" ? retry_at : job[:run_at],
    updated_at: Time.now.utc.iso8601
  )
end

#failed_countObject

Returns count of failed jobs — both the inline-mode terminal “failed” and the attempts-exhausted “dead” (the two states a human must act on; surfaced by the in-chat /status jobs line, #186).



181
182
183
# File 'lib/rubino/jobs/queue.rb', line 181

def failed_count
  @db[:jobs].where(status: %w[failed dead]).count
end

#find(id) ⇒ Object

Finds one job by full id or short-id prefix (the 8-char ids the list renders — same prefix resolution the memory store gives /memory show). nil when nothing matches; the first match wins on an ambiguous prefix.



158
159
160
161
162
# File 'lib/rubino/jobs/queue.rb', line 158

def find(id)
  return nil if id.to_s.empty?

  @db[:jobs].where(Sequel.like(:id, "#{id}%")).first
end

#list(status: nil, limit: 20) ⇒ Object

Lists jobs with optional filters. Reclaims lease-expired ‘running` rows FIRST (WHATIF-headless YELLOW-1) so the list is honest: reclaim_stale! otherwise ran only on the dequeue/process/drain write paths, so a row a dead worker abandoned sat “running” long past the 900s lease (e.g. 43 min) on every `jobs list` / `/jobs` read. Reclaiming on the read path re-queues (or kills) it so the status shown matches reality. Idempotent and cheap — a second call in #counts (the /jobs header) finds nothing.



148
149
150
151
152
153
# File 'lib/rubino/jobs/queue.rb', line 148

def list(status: nil, limit: 20)
  reclaim_stale!
  dataset = @db[:jobs].order(Sequel.desc(:created_at)).limit(limit)
  dataset = dataset.where(status: status) if status
  dataset.all
end

#next_due_queuedObject

The next still-queued, due, UNLOCKED job row (no lock taken). Used by the detached post-turn polishing drain (#319), which runs each row through Jobs::Runner#run_job sequentially on its own thread — so it needs the next candidate, not a worker-locked claim. Returns nil when the queue has nothing due. Scanned fresh each call so rows a follow-up turn enqueues mid-drain are picked up by the same worker (coalescing).



250
251
252
253
254
255
256
257
258
# File 'lib/rubino/jobs/queue.rb', line 250

def next_due_queued
  reclaim_stale!
  now = Time.now.utc.iso8601
  @db[:jobs]
    .where(status: "queued", locked_by: nil)
    .where { run_at <= now }
    .order(:priority, :run_at)
    .first
end

#pending_countObject

Returns count of pending jobs



174
175
176
# File 'lib/rubino/jobs/queue.rb', line 174

def pending_count
  @db[:jobs].where(status: "queued").count
end

#reap_inline_orphans(before: nil, session_id: nil) ⇒ Object

Drains ‘queued` rows left orphaned by an interrupted prior inline run (#84/#224). Runs every still-queued, due, unlocked row that was enqueued before before (the row this enqueue is about to run itself), so a turn whose extraction was interrupted is recovered on the next inline boot instead of sitting “queued” forever. Each is taken through run_job, which marks it completed / failed (inline) / dead terminally.

session_id SCOPES the sweep to the current run’s OWN post-turn jobs (WHATIF-headless RED-1). A headless one-shot drains before it exits, and the unscoped sweep ran EVERY due/queued/unlocked row in the table — a whole foreign backlog (each row a full LLM call), so a trivial ‘rubino -q` on a home with a backlog blocked 9-15+ min past its answer and, under –output-format json, withheld stdout behind the backlog. The post-turn jobs (ExtractMemory/DistillSkill/Summarize) all carry the enqueuing session’s id in their payload, so passing session_id restricts the drain to rows this session owns; foreign rows stay ‘queued` for the next run / the worker. Reaping with no session_id keeps the original whole-queue sweep (the in-process inline-enqueue boot recovery).



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/rubino/jobs/queue.rb', line 203

def reap_inline_orphans(before: nil, session_id: nil)
  # First re-queue any row a prior run abandoned mid-flight in `running`
  # (#76) so the scan below sweeps it too — same recovery the detached
  # drain gets via #next_due_queued.
  reclaim_stale!

  now = Time.now.utc.iso8601
  runner = Runner.new(db: @db)
  worker_id = "reap-#{Process.pid}"

  dataset = @db[:jobs]
            .where(status: "queued", locked_by: nil)
            .where { run_at <= now }
            .order(:priority, :run_at)
  dataset = dataset.exclude(id: before) if before
  # Match the session's own rows by the serialized payload tag
  # (JSON.generate emits "session_id":"<id>" with no spaces, #enqueue).
  dataset = dataset.where(Sequel.like(:payload_json, "%\"session_id\":\"#{session_id}\"%")) if session_id

  # Isolate each orphan: run_job already failure-isolates a bad row
  # terminally, but a defence-in-depth guard here means even an
  # unexpected raise (e.g. a DB error draining one row) can NEVER abort
  # the live turn that is enqueuing — mirrors the poison-row defence
  # Scheduler#schedule has for unparseable cron rows (#J1).
  #
  # CAS-claim each orphan through the SAME lock #dequeue uses BEFORE
  # running it (#346). Two processes sharing one RUBINO_HOME used to both
  # see the same queued orphans and run_job them directly — no lock, no
  # terminal re-check — double-running (and double-billing) the aux work.
  # The atomic claim transitions queued → running for exactly one caller;
  # a row another process already grabbed returns false and is skipped.
  dataset.select_map(:id).each do |orphan_id|
    next unless claim!(orphan_id, worker_id: worker_id)

    runner.run_job(orphan_id)
  rescue StandardError => e
    Rubino.logger.warn(event: "jobs.reap_orphan_failed", job_id: orphan_id, error: e.class.name,
                       message: e.message)
  end
end

#reclaim_stale!Object

Reclaims rows stranded in ‘running` by a worker that claimed them and then died / was quit / hung (#76). The drain scan only re-picks `queued` rows and nothing else recovers a `running` row, so a claimed-but-never- finished job sat forever (status=“running”, locked_by set, attempts=0) and the queue grew across sessions. A row whose lock is older than the lease is presumed abandoned: bump attempts and either re-queue it (so the next scan runs it) or mark it terminal (“dead”) once attempts are exhausted — so a genuinely stuck/poison job can’t be reclaimed and re-run forever. Returns the number of rows reclaimed.



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/rubino/jobs/queue.rb', line 269

def reclaim_stale!
  lease = @config.dig("jobs", "lock_lease_seconds") || 900
  cutoff = (Time.now - lease).utc.iso8601

  stale = @db[:jobs]
          .where(status: "running")
          .exclude(locked_at: nil)
          .where { locked_at < cutoff }
          .select_map(%i[id attempts max_attempts])

  stale.each do |id, attempts, max_attempts|
    new_attempts = attempts + 1
    dead = new_attempts >= max_attempts
    @db[:jobs].where(id: id, status: "running").update(
      status: dead ? "dead" : "queued",
      attempts: new_attempts,
      locked_at: nil,
      locked_by: nil,
      last_error: "reclaimed: worker abandoned the job (lock lease expired)",
      updated_at: Time.now.utc.iso8601
    )
  end
  stale.size
end