Class: Async::Background::Queue::Store
- Inherits:
-
Object
- Object
- Async::Background::Queue::Store
- Includes:
- Clock
- Defined in:
- lib/async/background/queue/store.rb
Constant Summary collapse
- SCHEMA =
<<~SQL PRAGMA auto_vacuum = INCREMENTAL; CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY, class_name TEXT NOT NULL, args TEXT NOT NULL DEFAULT '[]', options TEXT, status TEXT NOT NULL DEFAULT 'pending', created_at REAL NOT NULL, run_at REAL NOT NULL, locked_by INTEGER, locked_at REAL ); CREATE INDEX IF NOT EXISTS idx_jobs_pending ON jobs(run_at, id) WHERE status = 'pending'; SQL
- CLEANUP_INTERVAL =
300- CLEANUP_AGE =
3600
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #complete(job_id) ⇒ Object
- #enqueue(class_name, args = [], run_at = nil, options: {}) ⇒ Object
- #ensure_database! ⇒ Object
- #fail(job_id) ⇒ Object
- #fetch(worker_id) ⇒ Object
-
#initialize(path: self.class.default_path, options: {}) ⇒ Store
constructor
A new instance of Store.
- #recover(worker_id) ⇒ Object
- #retry_or_fail(job_id, fallback_options: nil) ⇒ Object
Constructor Details
#initialize(path: self.class.default_path, options: {}) ⇒ Store
Returns a new instance of Store.
76 77 78 79 80 81 82 83 |
# File 'lib/async/background/queue/store.rb', line 76 def initialize(path: self.class.default_path, options: {}) @path = path @options = StoreOptions.build() @pragma_sql = @options.pragma_sql.freeze @db = nil @schema_checked = false @last_cleanup_at = nil end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
74 75 76 |
# File 'lib/async/background/queue/store.rb', line 74 def @options end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
74 75 76 |
# File 'lib/async/background/queue/store.rb', line 74 def path @path end |
Class Method Details
.default_path ⇒ Object
156 157 158 |
# File 'lib/async/background/queue/store.rb', line 156 def self.default_path "async_background_queue.db" end |
Instance Method Details
#close ⇒ Object
147 148 149 150 151 152 153 154 |
# File 'lib/async/background/queue/store.rb', line 147 def close return unless @db && !@db.closed? finalize_statements @db.execute("PRAGMA optimize") rescue nil @db.close @db = nil end |
#complete(job_id) ⇒ Object
113 114 115 116 |
# File 'lib/async/background/queue/store.rb', line 113 def complete(job_id) ensure_connection @complete_stmt.execute(job_id) end |
#enqueue(class_name, args = [], run_at = nil, options: {}) ⇒ Object
95 96 97 98 99 100 |
# File 'lib/async/background/queue/store.rb', line 95 def enqueue(class_name, args = [], run_at = nil, options: {}) ensure_connection now = realtime_now @enqueue_stmt.execute(class_name, JSON.generate(args), (), now, run_at || now) @db.last_insert_row_id end |
#ensure_database! ⇒ Object
85 86 87 88 89 90 91 92 93 |
# File 'lib/async/background/queue/store.rb', line 85 def ensure_database! require_sqlite3 db = SQLite3::Database.new(@path) configure_database(db) db.execute_batch(SCHEMA) db.execute("PRAGMA wal_checkpoint(TRUNCATE)") db.close @schema_checked = true end |
#fail(job_id) ⇒ Object
118 119 120 121 |
# File 'lib/async/background/queue/store.rb', line 118 def fail(job_id) ensure_connection @fail_stmt.execute(job_id) end |
#fetch(worker_id) ⇒ Object
102 103 104 105 106 107 108 109 110 111 |
# File 'lib/async/background/queue/store.rb', line 102 def fetch(worker_id) ensure_connection now = realtime_now row = transaction { with_stmt(@fetch_stmt) { |s| s.execute(worker_id, now, now).first } } return unless row maybe_cleanup { id: row[0], class_name: row[1], args: JSON.parse(row[2]), options: (row[3]) } end |
#recover(worker_id) ⇒ Object
141 142 143 144 145 |
# File 'lib/async/background/queue/store.rb', line 141 def recover(worker_id) ensure_connection @requeue_stmt.execute(worker_id) @db.changes end |
#retry_or_fail(job_id, fallback_options: nil) ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/async/background/queue/store.rb', line 123 def retry_or_fail(job_id, fallback_options: nil) ensure_connection transaction do stored = with_stmt(@retry_state_stmt) { |s| (s.execute(job_id).first&.first) } policy = stored.empty? ? () : Job::Options.new(**stored) if policy&.retry? && policy.next_attempt <= policy.retry advanced = policy.with_attempt(policy.next_attempt) @retry_stmt.execute(realtime_now + advanced.next_retry_delay(advanced.attempt), (advanced.to_h.compact), job_id) :retried else @fail_stmt.execute(job_id) :failed end end end |