Class: Async::Background::Queue::Store
- Inherits:
-
Object
- Object
- Async::Background::Queue::Store
- Includes:
- Clock
- Defined in:
- lib/async/background/queue/store.rb
Constant Summary collapse
- SCHEMA =
<<~SQL PRAGMA auto_vacuum = INCREMENTAL; CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY, class_name TEXT NOT NULL, args TEXT NOT NULL DEFAULT '[]', options TEXT, status TEXT NOT NULL DEFAULT 'pending', created_at REAL NOT NULL, run_at REAL NOT NULL, locked_by INTEGER, locked_at REAL ); CREATE INDEX IF NOT EXISTS idx_jobs_pending ON jobs(run_at, id) WHERE status = 'pending'; SQL
- MMAP_SIZE =
268_435_456- PRAGMAS =
->(mmap_size) { <<~SQL PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL; PRAGMA mmap_size = #{mmap_size}; PRAGMA cache_size = -16000; PRAGMA temp_store = MEMORY; PRAGMA busy_timeout = 5000; PRAGMA journal_size_limit = 67108864; SQL }.freeze
- CLEANUP_INTERVAL =
300- CLEANUP_AGE =
3600
Instance Attribute Summary collapse
-
#path ⇒ Object
readonly
Returns the value of attribute path.
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #complete(job_id) ⇒ Object
- #enqueue(class_name, args = [], run_at = nil, options: {}) ⇒ Object
- #ensure_database! ⇒ Object
- #fail(job_id) ⇒ Object
- #fetch(worker_id) ⇒ Object
-
#initialize(path: self.class.default_path, mmap: true) ⇒ Store
constructor
A new instance of Store.
- #recover(worker_id) ⇒ Object
Constructor Details
#initialize(path: self.class.default_path, mmap: true) ⇒ Store
Returns a new instance of Store.
46 47 48 49 50 51 52 53 |
# File 'lib/async/background/queue/store.rb', line 46 def initialize(path: self.class.default_path, mmap: true) @path = path @mmap = mmap @pragma_sql = PRAGMAS.call(mmap ? MMAP_SIZE : 0).freeze @db = nil @schema_checked = false @last_cleanup_at = nil end |
Instance Attribute Details
#path ⇒ Object (readonly)
Returns the value of attribute path.
44 45 46 |
# File 'lib/async/background/queue/store.rb', line 44 def path @path end |
Class Method Details
.default_path ⇒ Object
121 122 123 |
# File 'lib/async/background/queue/store.rb', line 121 def self.default_path "async_background_queue.db" end |
Instance Method Details
#close ⇒ Object
112 113 114 115 116 117 118 119 |
# File 'lib/async/background/queue/store.rb', line 112 def close return unless @db && !@db.closed? finalize_statements @db.execute("PRAGMA optimize") rescue nil @db.close @db = nil end |
#complete(job_id) ⇒ Object
96 97 98 99 |
# File 'lib/async/background/queue/store.rb', line 96 def complete(job_id) ensure_connection @complete_stmt.execute(job_id) end |
#enqueue(class_name, args = [], run_at = nil, options: {}) ⇒ Object
66 67 68 69 70 71 72 |
# File 'lib/async/background/queue/store.rb', line 66 def enqueue(class_name, args = [], run_at = nil, options: {}) ensure_connection run_at ||= realtime_now = .empty? ? nil : JSON.generate() @enqueue_stmt.execute(class_name, JSON.generate(args), , realtime_now, run_at) @db.last_insert_row_id end |
#ensure_database! ⇒ Object
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/async/background/queue/store.rb', line 55 def ensure_database! require_sqlite3 db = SQLite3::Database.new(@path) db.execute('PRAGMA busy_timeout = 5000') db.execute_batch(@pragma_sql) db.execute_batch(SCHEMA) db.execute("PRAGMA wal_checkpoint(TRUNCATE)") db.close @schema_checked = true end |
#fail(job_id) ⇒ Object
101 102 103 104 |
# File 'lib/async/background/queue/store.rb', line 101 def fail(job_id) ensure_connection @fail_stmt.execute(job_id) end |
#fetch(worker_id) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/async/background/queue/store.rb', line 74 def fetch(worker_id) ensure_connection now = realtime_now @db.execute("BEGIN IMMEDIATE") begin results = @fetch_stmt.execute(worker_id, now, now) row = results.first ensure @fetch_stmt.reset! rescue nil end @db.execute("COMMIT") return unless row maybe_cleanup = row[3] ? JSON.parse(row[3], symbolize_names: true) : {} { id: row[0], class_name: row[1], args: JSON.parse(row[2]), options: } rescue @db.execute("ROLLBACK") rescue nil raise end |
#recover(worker_id) ⇒ Object
106 107 108 109 110 |
# File 'lib/async/background/queue/store.rb', line 106 def recover(worker_id) ensure_connection @requeue_stmt.execute(worker_id) @db.changes end |