Class: Async::Background::Queue::Store

Inherits:
Object
  • Object
show all
Includes:
Clock
Defined in:
lib/async/background/queue/store.rb

Constant Summary collapse

SCHEMA =
<<~SQL
  PRAGMA auto_vacuum = INCREMENTAL;
  CREATE TABLE IF NOT EXISTS jobs (
    id         INTEGER PRIMARY KEY,
    class_name TEXT    NOT NULL,
    args       TEXT    NOT NULL DEFAULT '[]',
    options    TEXT,
    status     TEXT    NOT NULL DEFAULT 'pending',
    created_at REAL    NOT NULL,
    run_at     REAL    NOT NULL,
    locked_by  INTEGER,
    locked_at  REAL
  );
  CREATE INDEX IF NOT EXISTS idx_jobs_pending ON jobs(run_at, id) WHERE status = 'pending';
SQL
CLEANUP_INTERVAL =
300
CLEANUP_AGE =
3600

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path: self.class.default_path, options: {}) ⇒ Store

Returns a new instance of Store.



76
77
78
79
80
81
82
83
# File 'lib/async/background/queue/store.rb', line 76

def initialize(path: self.class.default_path, options: {})
  @path           = path
  @options        = StoreOptions.build(options)
  @pragma_sql     = @options.pragma_sql.freeze
  @db             = nil
  @schema_checked = false
  @last_cleanup_at = nil
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



74
75
76
# File 'lib/async/background/queue/store.rb', line 74

def options
  @options
end

#pathObject (readonly)

Returns the value of attribute path.



74
75
76
# File 'lib/async/background/queue/store.rb', line 74

def path
  @path
end

Class Method Details

.default_pathObject



156
157
158
# File 'lib/async/background/queue/store.rb', line 156

def self.default_path
  "async_background_queue.db"
end

Instance Method Details

#closeObject



147
148
149
150
151
152
153
154
# File 'lib/async/background/queue/store.rb', line 147

def close
  return unless @db && !@db.closed?

  finalize_statements
  @db.execute("PRAGMA optimize") rescue nil
  @db.close
  @db = nil
end

#complete(job_id) ⇒ Object



113
114
115
116
# File 'lib/async/background/queue/store.rb', line 113

def complete(job_id)
  ensure_connection
  @complete_stmt.execute(job_id)
end

#enqueue(class_name, args = [], run_at = nil, options: {}) ⇒ Object



95
96
97
98
99
100
# File 'lib/async/background/queue/store.rb', line 95

def enqueue(class_name, args = [], run_at = nil, options: {})
  ensure_connection
  now = realtime_now
  @enqueue_stmt.execute(class_name, JSON.generate(args), dump_options(options), now, run_at || now)
  @db.last_insert_row_id
end

#ensure_database!Object



85
86
87
88
89
90
91
92
93
# File 'lib/async/background/queue/store.rb', line 85

def ensure_database!
  require_sqlite3
  db = SQLite3::Database.new(@path)
  configure_database(db)
  db.execute_batch(SCHEMA)
  db.execute("PRAGMA wal_checkpoint(TRUNCATE)")
  db.close
  @schema_checked = true
end

#fail(job_id) ⇒ Object



118
119
120
121
# File 'lib/async/background/queue/store.rb', line 118

def fail(job_id)
  ensure_connection
  @fail_stmt.execute(job_id)
end

#fetch(worker_id) ⇒ Object



102
103
104
105
106
107
108
109
110
111
# File 'lib/async/background/queue/store.rb', line 102

def fetch(worker_id)
  ensure_connection
  now = realtime_now

  row = transaction { with_stmt(@fetch_stmt) { |s| s.execute(worker_id, now, now).first } }
  return unless row

  maybe_cleanup
  { id: row[0], class_name: row[1], args: JSON.parse(row[2]), options: load_options(row[3]) }
end

#recover(worker_id) ⇒ Object



141
142
143
144
145
# File 'lib/async/background/queue/store.rb', line 141

def recover(worker_id)
  ensure_connection
  @requeue_stmt.execute(worker_id)
  @db.changes
end

#retry_or_fail(job_id, fallback_options: nil) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/async/background/queue/store.rb', line 123

def retry_or_fail(job_id, fallback_options: nil)
  ensure_connection

  transaction do
    stored = with_stmt(@retry_state_stmt) { |s| load_options(s.execute(job_id).first&.first) }
    policy = stored.empty? ? normalize_options(fallback_options) : Job::Options.new(**stored)

    if policy&.retry? && policy.next_attempt <= policy.retry
      advanced = policy.with_attempt(policy.next_attempt)
      @retry_stmt.execute(realtime_now + advanced.next_retry_delay(advanced.attempt), dump_options(advanced.to_h.compact), job_id)
      :retried
    else
      @fail_stmt.execute(job_id)
      :failed
    end
  end
end