Class: Rubino::Jobs::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/jobs/queue.rb

Overview

Manages the job queue backed by SQLite. Supports enqueue, dequeue, locking, and status queries.

Instance Method Summary collapse

Constructor Details

#initialize(db: nil, config: nil) ⇒ Queue

Returns a new instance of Queue.



11
12
13
14
# File 'lib/rubino/jobs/queue.rb', line 11

def initialize(db: nil, config: nil)
  @db = db || Rubino.database.db
  @config = config || Rubino.configuration
end

Instance Method Details

#cleanup!(older_than_days: 7) ⇒ Object

Cleans up old completed jobs



175
176
177
178
179
180
181
# File 'lib/rubino/jobs/queue.rb', line 175

def cleanup!(older_than_days: 7)
  cutoff = (Time.now - (older_than_days * 86_400)).utc.iso8601
  @db[:jobs]
    .where(status: "completed")
    .where { created_at < cutoff }
    .delete
end

#complete!(job_id) ⇒ Object

Marks a job as completed



78
79
80
81
82
83
84
85
# File 'lib/rubino/jobs/queue.rb', line 78

def complete!(job_id)
  @db[:jobs].where(id: job_id).update(
    status: "completed",
    locked_at: nil,
    locked_by: nil,
    updated_at: Time.now.utc.iso8601
  )
end

#countsObject

Status counts for the whole queue (status => count), one grouped query — the in-chat /jobs header line (#187). {} when the queue is empty.



139
140
141
# File 'lib/rubino/jobs/queue.rb', line 139

def counts
  @db[:jobs].group_and_count(:status).to_h { |row| [row[:status], row[:count]] }
end

#dequeue(worker_id:) ⇒ Object

Dequeues the next available job (locks it)



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/rubino/jobs/queue.rb', line 52

def dequeue(worker_id:)
  now = Time.now.utc.iso8601

  job = @db[:jobs]
        .where(status: "queued")
        .where { run_at <= now }
        .order(:priority, :run_at)
        .first

  return nil unless job

  # Lock the job
  updated = @db[:jobs]
            .where(id: job[:id], status: "queued")
            .update(
              status: "running",
              locked_at: now,
              locked_by: worker_id,
              updated_at: now
            )

  # Return nil if another worker grabbed it first
  updated > 0 ? @db[:jobs].where(id: job[:id]).first : nil
end

#enqueue(type, payload, priority: 100, run_at: nil) ⇒ Object

Enqueues a new job



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/rubino/jobs/queue.rb', line 17

def enqueue(type, payload, priority: 100, run_at: nil)
  now = Time.now.utc.iso8601
  id = SecureRandom.uuid

  @db[:jobs].insert(
    id: id,
    type: type,
    status: "queued",
    priority: priority,
    payload_json: JSON.generate(payload),
    attempts: 0,
    max_attempts: @config.jobs_max_attempts,
    run_at: run_at || now,
    created_at: now,
    updated_at: now
  )

  # If inline mode, execute immediately — but first drain any stale rows
  # a previous inline run left orphaned (#84/#224). Inline mode has no
  # background drainer, so a `queued` row whose enqueuing process was
  # interrupted mid-run (e.g. the user quit the session while the
  # post-turn extraction was still finishing) — run_job is called
  # directly, never locked, and Interrupt is not a StandardError, so the
  # row never reaches complete!/fail! — sits "queued" forever and is the
  # behaviour #84 closed. Every inline enqueue means a live process is
  # here and willing to drain, so reap those orphans on this boot.
  if @config.jobs_mode == "inline"
    reap_inline_orphans(before: id)
    Runner.new.run_job(id)
  end

  id
end

#fail!(job_id, error:) ⇒ Object

Marks a job as failed, increments attempts



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/rubino/jobs/queue.rb', line 88

def fail!(job_id, error:)
  job = @db[:jobs].where(id: job_id).first
  return unless job

  new_attempts = job[:attempts] + 1
  # Inline mode has no background drainer, so re-queueing a failed job
  # would leave it "queued" forever (#84) — mark it terminal ("failed")
  # instead so `jobs list` is honest. Worker/manual modes keep the
  # retry-with-backoff behavior until attempts are exhausted.
  new_status =
    if new_attempts >= job[:max_attempts]
      "dead"
    elsif @config.jobs_mode == "inline"
      "failed"
    else
      "queued"
    end

  # Calculate retry time with backoff
  backoff = @config.dig("jobs", "retry_backoff_seconds") || 30
  retry_at = (Time.now + (backoff * new_attempts)).utc.iso8601

  @db[:jobs].where(id: job_id).update(
    status: new_status,
    attempts: new_attempts,
    last_error: error,
    locked_at: nil,
    locked_by: nil,
    run_at: new_status == "queued" ? retry_at : job[:run_at],
    updated_at: Time.now.utc.iso8601
  )
end

#failed_countObject

Returns count of failed jobs — both the inline-mode terminal “failed” and the attempts-exhausted “dead” (the two states a human must act on; surfaced by the in-chat /status jobs line, #186).



151
152
153
# File 'lib/rubino/jobs/queue.rb', line 151

def failed_count
  @db[:jobs].where(status: %w[failed dead]).count
end

#find(id) ⇒ Object

Finds one job by full id or short-id prefix (the 8-char ids the list renders — same prefix resolution the memory store gives /memory show). nil when nothing matches; the first match wins on an ambiguous prefix.



131
132
133
134
135
# File 'lib/rubino/jobs/queue.rb', line 131

def find(id)
  return nil if id.to_s.empty?

  @db[:jobs].where(Sequel.like(:id, "#{id}%")).first
end

#list(status: nil, limit: 20) ⇒ Object

Lists jobs with optional filters



122
123
124
125
126
# File 'lib/rubino/jobs/queue.rb', line 122

def list(status: nil, limit: 20)
  dataset = @db[:jobs].order(Sequel.desc(:created_at)).limit(limit)
  dataset = dataset.where(status: status) if status
  dataset.all
end

#pending_countObject

Returns count of pending jobs



144
145
146
# File 'lib/rubino/jobs/queue.rb', line 144

def pending_count
  @db[:jobs].where(status: "queued").count
end

#reap_inline_orphans(before: nil) ⇒ Object

Drains ‘queued` rows left orphaned by an interrupted prior inline run (#84/#224). Runs every still-queued, due, unlocked row that was enqueued before before (the row this enqueue is about to run itself), so a turn whose extraction was interrupted is recovered on the next inline boot instead of sitting “queued” forever. Each is taken through run_job, which marks it completed / failed (inline) / dead terminally.



161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/rubino/jobs/queue.rb', line 161

def reap_inline_orphans(before: nil)
  now = Time.now.utc.iso8601
  runner = Runner.new(db: @db)

  dataset = @db[:jobs]
            .where(status: "queued", locked_by: nil)
            .where { run_at <= now }
            .order(:priority, :run_at)
  dataset = dataset.exclude(id: before) if before

  dataset.select_map(:id).each { |orphan_id| runner.run_job(orphan_id) }
end