Class: Async::Background::Queue::Store
- Inherits:
-
Object
- Object
- Async::Background::Queue::Store
- Includes:
- Clock
- Defined in:
- lib/async/background/queue/store.rb
Constant Summary collapse
- SCHEMA =
<<~SQL PRAGMA auto_vacuum = INCREMENTAL; CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY, class_name TEXT NOT NULL, args TEXT NOT NULL DEFAULT '[]', options TEXT, status TEXT NOT NULL DEFAULT 'pending', created_at REAL NOT NULL, run_at REAL NOT NULL, locked_by INTEGER, locked_at REAL ); CREATE INDEX IF NOT EXISTS idx_jobs_pending ON jobs(run_at, id) WHERE status = 'pending'; SQL
- MMAP_SIZE =
268_435_456- PRAGMAS =
->(mmap_size) { <<~SQL PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL; PRAGMA mmap_size = #{mmap_size}; PRAGMA cache_size = -16000; PRAGMA temp_store = MEMORY; PRAGMA busy_timeout = 5000; PRAGMA journal_size_limit = 67108864; SQL }.freeze
- CLEANUP_INTERVAL =
300- CLEANUP_AGE =
3600
Instance Attribute Summary collapse
-
#path ⇒ Object
readonly
Returns the value of attribute path.
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #complete(job_id) ⇒ Object
- #enqueue(class_name, args = [], run_at = nil, options: {}) ⇒ Object
- #ensure_database! ⇒ Object
- #fail(job_id) ⇒ Object
- #fetch(worker_id) ⇒ Object
-
#initialize(path: self.class.default_path, mmap: true) ⇒ Store
constructor
A new instance of Store.
- #recover(worker_id) ⇒ Object
- #retry_or_fail(job_id, fallback_options: nil) ⇒ Object
Constructor Details
#initialize(path: self.class.default_path, mmap: true) ⇒ Store
Returns a new instance of Store.
46 47 48 49 50 51 52 |
# File 'lib/async/background/queue/store.rb', line 46 def initialize(path: self.class.default_path, mmap: true) @path = path @pragma_sql = PRAGMAS.call(mmap ? MMAP_SIZE : 0).freeze @db = nil @schema_checked = false @last_cleanup_at = nil end |
Instance Attribute Details
#path ⇒ Object (readonly)
Returns the value of attribute path.
44 45 46 |
# File 'lib/async/background/queue/store.rb', line 44 def path @path end |
Class Method Details
.default_path ⇒ Object
125 126 127 |
# File 'lib/async/background/queue/store.rb', line 125 def self.default_path "async_background_queue.db" end |
Instance Method Details
#close ⇒ Object
116 117 118 119 120 121 122 123 |
# File 'lib/async/background/queue/store.rb', line 116 def close return unless @db && !@db.closed? finalize_statements @db.execute("PRAGMA optimize") rescue nil @db.close @db = nil end |
#complete(job_id) ⇒ Object
82 83 84 85 |
# File 'lib/async/background/queue/store.rb', line 82 def complete(job_id) ensure_connection @complete_stmt.execute(job_id) end |
#enqueue(class_name, args = [], run_at = nil, options: {}) ⇒ Object
64 65 66 67 68 69 |
# File 'lib/async/background/queue/store.rb', line 64 def enqueue(class_name, args = [], run_at = nil, options: {}) ensure_connection now = realtime_now @enqueue_stmt.execute(class_name, JSON.generate(args), (), now, run_at || now) @db.last_insert_row_id end |
#ensure_database! ⇒ Object
54 55 56 57 58 59 60 61 62 |
# File 'lib/async/background/queue/store.rb', line 54 def ensure_database! require_sqlite3 db = SQLite3::Database.new(@path) configure_database(db) db.execute_batch(SCHEMA) db.execute("PRAGMA wal_checkpoint(TRUNCATE)") db.close @schema_checked = true end |
#fail(job_id) ⇒ Object
87 88 89 90 |
# File 'lib/async/background/queue/store.rb', line 87 def fail(job_id) ensure_connection @fail_stmt.execute(job_id) end |
#fetch(worker_id) ⇒ Object
71 72 73 74 75 76 77 78 79 80 |
# File 'lib/async/background/queue/store.rb', line 71 def fetch(worker_id) ensure_connection now = realtime_now row = transaction { with_stmt(@fetch_stmt) { |s| s.execute(worker_id, now, now).first } } return unless row maybe_cleanup { id: row[0], class_name: row[1], args: JSON.parse(row[2]), options: (row[3]) } end |
#recover(worker_id) ⇒ Object
110 111 112 113 114 |
# File 'lib/async/background/queue/store.rb', line 110 def recover(worker_id) ensure_connection @requeue_stmt.execute(worker_id) @db.changes end |
#retry_or_fail(job_id, fallback_options: nil) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/async/background/queue/store.rb', line 92 def retry_or_fail(job_id, fallback_options: nil) ensure_connection transaction do stored = with_stmt(@retry_state_stmt) { |s| (s.execute(job_id).first&.first) } policy = stored.empty? ? () : Job::Options.new(**stored) if policy&.retry? && policy.next_attempt <= policy.retry advanced = policy.with_attempt(policy.next_attempt) @retry_stmt.execute(realtime_now + advanced.next_retry_delay(advanced.attempt), (advanced.to_h.compact), job_id) :retried else @fail_stmt.execute(job_id) :failed end end end |