Class: Rubino::Jobs::Queue
- Inherits:
-
Object
- Object
- Rubino::Jobs::Queue
- Defined in:
- lib/rubino/jobs/queue.rb
Overview
Manages the job queue backed by SQLite. Supports enqueue, dequeue, locking, and status queries.
Instance Method Summary collapse
-
#claim!(job_id, worker_id:) ⇒ Object
Atomically claims a single row by id: the SAME compare-and-swap lock #dequeue uses, transitioning queued → running only while the row is still ‘queued`.
-
#cleanup!(older_than_days: 7) ⇒ Object
Cleans up old completed jobs.
-
#complete!(job_id) ⇒ Object
Marks a job as completed.
-
#counts ⇒ Object
Status counts for the whole queue (status => count), one grouped query — the in-chat /jobs header line (#187).
-
#dequeue(worker_id:) ⇒ Object
Dequeues the next available job (locks it).
-
#enqueue(type, payload, priority: 100, run_at: nil, drain_inline: true) ⇒ Object
Enqueues a new job.
-
#fail!(job_id, error:) ⇒ Object
Marks a job as failed, increments attempts.
-
#failed_count ⇒ Object
Returns count of failed jobs — both the inline-mode terminal “failed” and the attempts-exhausted “dead” (the two states a human must act on; surfaced by the in-chat /status jobs line, #186).
-
#find(id) ⇒ Object
Finds one job by full id or short-id prefix (the 8-char ids the list renders — same prefix resolution the memory store gives /memory show).
-
#initialize(db: nil, config: nil) ⇒ Queue
constructor
A new instance of Queue.
-
#list(status: nil, limit: 20) ⇒ Object
Lists jobs with optional filters.
-
#next_due_queued ⇒ Object
The next still-queued, due, UNLOCKED job row (no lock taken).
-
#pending_count ⇒ Object
Returns count of pending jobs.
-
#reap_inline_orphans(before: nil) ⇒ Object
Drains ‘queued` rows left orphaned by an interrupted prior inline run (#84/#224).
Constructor Details
#initialize(db: nil, config: nil) ⇒ Queue
Returns a new instance of Queue.
11 12 13 14 |
# File 'lib/rubino/jobs/queue.rb', line 11 def initialize(db: nil, config: nil) @db = db || Rubino.database.db @config = config || Rubino.configuration end |
Instance Method Details
#claim!(job_id, worker_id:) ⇒ Object
Atomically claims a single row by id: the SAME compare-and-swap lock #dequeue uses, transitioning queued → running only while the row is still ‘queued`. Returns true to exactly ONE caller; every concurrent claim (another process sharing this RUBINO_HOME, or a re-entrant reap) sees the row already `running` and gets false. The reaper (#346) claims through here before running each orphan, so two processes can never double-run (and double-bill) the same ExtractMemoryJob.
84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/rubino/jobs/queue.rb', line 84 def claim!(job_id, worker_id:) # rubocop:disable Naming/PredicateMethod -- a mutating CAS (bang), not a query; the boolean reports whether THIS caller won the lock now = Time.now.utc.iso8601 updated = @db[:jobs] .where(id: job_id, status: "queued") .update( status: "running", locked_at: now, locked_by: worker_id, updated_at: now ) updated.positive? end |
#cleanup!(older_than_days: 7) ⇒ Object
Cleans up old completed jobs
230 231 232 233 234 235 236 |
# File 'lib/rubino/jobs/queue.rb', line 230 def cleanup!(older_than_days: 7) cutoff = (Time.now - (older_than_days * 86_400)).utc.iso8601 @db[:jobs] .where(status: "completed") .where { created_at < cutoff } .delete end |
#complete!(job_id) ⇒ Object
Marks a job as completed
98 99 100 101 102 103 104 105 |
# File 'lib/rubino/jobs/queue.rb', line 98 def complete!(job_id) @db[:jobs].where(id: job_id).update( status: "completed", locked_at: nil, locked_by: nil, updated_at: Time.now.utc.iso8601 ) end |
#counts ⇒ Object
Status counts for the whole queue (status => count), one grouped query — the in-chat /jobs header line (#187). {} when the queue is empty.
159 160 161 |
# File 'lib/rubino/jobs/queue.rb', line 159 def counts @db[:jobs].group_and_count(:status).to_h { |row| [row[:status], row[:count]] } end |
#dequeue(worker_id:) ⇒ Object
Dequeues the next available job (locks it)
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/rubino/jobs/queue.rb', line 61 def dequeue(worker_id:) now = Time.now.utc.iso8601 job = @db[:jobs] .where(status: "queued") .where { run_at <= now } .order(:priority, :run_at) .first return nil unless job return nil unless claim!(job[:id], worker_id: worker_id) @db[:jobs].where(id: job[:id]).first end |
#enqueue(type, payload, priority: 100, run_at: nil, drain_inline: true) ⇒ Object
Enqueues a new job.
drain_inline controls the inline-mode auto-execute: when true (the default) an inline-mode enqueue runs the job synchronously right here, as before. The post-turn polishing path (#319) passes drain_inline: false so the row is only PERSISTED — the detached Interaction::Polishing thread then drains it off the live turn’s critical path, so a slow/429 aux call can never block the next prompt.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/rubino/jobs/queue.rb', line 24 def enqueue(type, payload, priority: 100, run_at: nil, drain_inline: true) now = Time.now.utc.iso8601 id = SecureRandom.uuid @db[:jobs].insert( id: id, type: type, status: "queued", priority: priority, payload_json: JSON.generate(payload), attempts: 0, max_attempts: @config.jobs_max_attempts, run_at: run_at || now, created_at: now, updated_at: now ) return id unless drain_inline # If inline mode, execute immediately — but first drain any stale rows # a previous inline run left orphaned (#84/#224). Inline mode has no # background drainer, so a `queued` row whose enqueuing process was # interrupted mid-run (e.g. the user quit the session while the # post-turn extraction was still finishing) — run_job is called # directly, never locked, and Interrupt is not a StandardError, so the # row never reaches complete!/fail! — sits "queued" forever and is the # behaviour #84 closed. Every inline enqueue means a live process is # here and willing to drain, so reap those orphans on this boot. if @config.jobs_mode == "inline" reap_inline_orphans(before: id) Runner.new.run_job(id) end id end |
#fail!(job_id, error:) ⇒ Object
Marks a job as failed, increments attempts
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/rubino/jobs/queue.rb', line 108 def fail!(job_id, error:) job = @db[:jobs].where(id: job_id).first return unless job new_attempts = job[:attempts] + 1 # Inline mode has no background drainer, so re-queueing a failed job # would leave it "queued" forever (#84) — mark it terminal ("failed") # instead so `jobs list` is honest. Worker/manual modes keep the # retry-with-backoff behavior until attempts are exhausted. new_status = if new_attempts >= job[:max_attempts] "dead" elsif @config.jobs_mode == "inline" "failed" else "queued" end # Calculate retry time with backoff backoff = @config.dig("jobs", "retry_backoff_seconds") || 30 retry_at = (Time.now + (backoff * new_attempts)).utc.iso8601 @db[:jobs].where(id: job_id).update( status: new_status, attempts: new_attempts, last_error: error, locked_at: nil, locked_by: nil, run_at: new_status == "queued" ? retry_at : job[:run_at], updated_at: Time.now.utc.iso8601 ) end |
#failed_count ⇒ Object
Returns count of failed jobs — both the inline-mode terminal “failed” and the attempts-exhausted “dead” (the two states a human must act on; surfaced by the in-chat /status jobs line, #186).
171 172 173 |
# File 'lib/rubino/jobs/queue.rb', line 171 def failed_count @db[:jobs].where(status: %w[failed dead]).count end |
#find(id) ⇒ Object
Finds one job by full id or short-id prefix (the 8-char ids the list renders — same prefix resolution the memory store gives /memory show). nil when nothing matches; the first match wins on an ambiguous prefix.
151 152 153 154 155 |
# File 'lib/rubino/jobs/queue.rb', line 151 def find(id) return nil if id.to_s.empty? @db[:jobs].where(Sequel.like(:id, "#{id}%")).first end |
#list(status: nil, limit: 20) ⇒ Object
Lists jobs with optional filters
142 143 144 145 146 |
# File 'lib/rubino/jobs/queue.rb', line 142 def list(status: nil, limit: 20) dataset = @db[:jobs].order(Sequel.desc(:created_at)).limit(limit) dataset = dataset.where(status: status) if status dataset.all end |
#next_due_queued ⇒ Object
The next still-queued, due, UNLOCKED job row (no lock taken). Used by the detached post-turn polishing drain (#319), which runs each row through Jobs::Runner#run_job sequentially on its own thread — so it needs the next candidate, not a worker-locked claim. Returns nil when the queue has nothing due. Scanned fresh each call so rows a follow-up turn enqueues mid-drain are picked up by the same worker (coalescing).
220 221 222 223 224 225 226 227 |
# File 'lib/rubino/jobs/queue.rb', line 220 def next_due_queued now = Time.now.utc.iso8601 @db[:jobs] .where(status: "queued", locked_by: nil) .where { run_at <= now } .order(:priority, :run_at) .first end |
#pending_count ⇒ Object
Returns count of pending jobs
164 165 166 |
# File 'lib/rubino/jobs/queue.rb', line 164 def pending_count @db[:jobs].where(status: "queued").count end |
#reap_inline_orphans(before: nil) ⇒ Object
Drains ‘queued` rows left orphaned by an interrupted prior inline run (#84/#224). Runs every still-queued, due, unlocked row that was enqueued before before (the row this enqueue is about to run itself), so a turn whose extraction was interrupted is recovered on the next inline boot instead of sitting “queued” forever. Each is taken through run_job, which marks it completed / failed (inline) / dead terminally.
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/rubino/jobs/queue.rb', line 181 def reap_inline_orphans(before: nil) now = Time.now.utc.iso8601 runner = Runner.new(db: @db) worker_id = "reap-#{Process.pid}" dataset = @db[:jobs] .where(status: "queued", locked_by: nil) .where { run_at <= now } .order(:priority, :run_at) dataset = dataset.exclude(id: before) if before # Isolate each orphan: run_job already failure-isolates a bad row # terminally, but a defence-in-depth guard here means even an # unexpected raise (e.g. a DB error draining one row) can NEVER abort # the live turn that is enqueuing — mirrors the poison-row defence # Scheduler#schedule has for unparseable cron rows (#J1). # # CAS-claim each orphan through the SAME lock #dequeue uses BEFORE # running it (#346). Two processes sharing one RUBINO_HOME used to both # see the same queued orphans and run_job them directly — no lock, no # terminal re-check — double-running (and double-billing) the aux work. # The atomic claim transitions queued → running for exactly one caller; # a row another process already grabbed returns false and is skipped. dataset.select_map(:id).each do |orphan_id| next unless claim!(orphan_id, worker_id: worker_id) runner.run_job(orphan_id) rescue StandardError => e Rubino.logger.warn(event: "jobs.reap_orphan_failed", job_id: orphan_id, error: e.class.name, message: e.) end end |