Class: Async::Background::Queue::Store
- Inherits:
-
Object
- Object
- Async::Background::Queue::Store
- Includes:
- Clock
- Defined in:
- lib/async/background/queue/store.rb,
lib/async/background/queue/schema.rb
Defined Under Namespace
Classes: SchemaError
Constant Summary collapse
- SCHEMA_VERSION =
Schema::VERSION
- MIGRATION_BUSY_TIMEOUT_MS =
Schema::MIGRATION_BUSY_TIMEOUT_MS
- REQUIRED_INDEXES =
Schema::REQUIRED_INDEXES
- SCHEMA =
SQL::CREATE_SCHEMA
- CLEANUP_INTERVAL =
300- CLEANUP_AGE =
3600- FAILED_RETENTION_AGE =
7 * 24 * 3600
- ERROR_MESSAGE_MAX_LEN =
2_000- EMPTY_ARGS_JSON =
'[]'.freeze
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
Class Method Summary collapse
- .default_path ⇒ Object
- .migrate!(path: default_path, options: {}) ⇒ Object
- .prepare_dashboard!(path: default_path, options: {}) ⇒ Object
Instance Method Summary collapse
- #close ⇒ Object
- #complete(job_id, claim_token:, finished_at: realtime_now, duration_ms: nil) ⇒ Object
- #data_version ⇒ Object
- #enqueue(class_name, args = EMPTY_ARGS, run_at = nil, options: EMPTY_OPTIONS) ⇒ Object
- #fail(job_id, claim_token:, error_class: nil, error_message: nil, finished_at: realtime_now, duration_ms: nil) ⇒ Object
- #fetch(worker_id) ⇒ Object
-
#initialize(path: self.class.default_path, options: {}) ⇒ Store
constructor
A new instance of Store.
- #mark_started!(job_id, claim_token:, started_at: realtime_now) ⇒ Object
- #migrate! ⇒ Object (also: #ensure_database!)
- #next_pending_run_at ⇒ Object
- #prepare_dashboard! ⇒ Object
- #recover(worker_id) ⇒ Object
- #retry_or_fail(job_id, claim_token:, error_class: nil, error_message: nil, fallback_options: nil, finished_at: realtime_now, duration_ms: nil) ⇒ Object
- #schema_version ⇒ Object
Constructor Details
#initialize(path: self.class.default_path, options: {}) ⇒ Store
Returns a new instance of Store.
48 49 50 51 52 53 54 55 |
# File 'lib/async/background/queue/store.rb', line 48 def initialize(path: self.class.default_path, options: {}) @path = path @options = StoreOptions.build() @pragma_sql = @options.pragma_sql.freeze @db = nil @schema_checked = false @last_cleanup_at = nil end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
28 29 30 |
# File 'lib/async/background/queue/store.rb', line 28 def @options end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
28 29 30 |
# File 'lib/async/background/queue/store.rb', line 28 def path @path end |
Class Method Details
.default_path ⇒ Object
46 |
# File 'lib/async/background/queue/store.rb', line 46 def self.default_path = 'async_background_queue.db' |
.migrate!(path: default_path, options: {}) ⇒ Object
30 31 32 33 34 35 36 |
# File 'lib/async/background/queue/store.rb', line 30 def self.migrate!(path: default_path, options: {}) store = new(path: path, options: ) store.migrate! SCHEMA_VERSION ensure store&.close end |
.prepare_dashboard!(path: default_path, options: {}) ⇒ Object
38 39 40 41 42 43 44 |
# File 'lib/async/background/queue/store.rb', line 38 def self.prepare_dashboard!(path: default_path, options: {}) store = new(path: path, options: ) store.prepare_dashboard! SCHEMA_VERSION ensure store&.close end |
Instance Method Details
#close ⇒ Object
163 164 165 166 167 168 169 170 171 |
# File 'lib/async/background/queue/store.rb', line 163 def close return unless connected? finalize_statements @db.execute(SQL::OPTIMIZE) rescue nil @db.close @db = nil @schema_checked = false end |
#complete(job_id, claim_token:, finished_at: realtime_now, duration_ms: nil) ⇒ Object
107 108 109 110 111 |
# File 'lib/async/background/queue/store.rb', line 107 def complete(job_id, claim_token:, finished_at: realtime_now, duration_ms: nil) ensure_connection @complete_stmt.execute(finished_at, duration_ms, job_id, claim_token) @db.changes.positive? end |
#data_version ⇒ Object
158 159 160 161 |
# File 'lib/async/background/queue/store.rb', line 158 def data_version ensure_connection @db.get_first_value(SQL::DATA_VERSION).to_i end |
#enqueue(class_name, args = EMPTY_ARGS, run_at = nil, options: EMPTY_OPTIONS) ⇒ Object
80 81 82 83 84 85 |
# File 'lib/async/background/queue/store.rb', line 80 def enqueue(class_name, args = EMPTY_ARGS, run_at = nil, options: EMPTY_OPTIONS) ensure_connection now = realtime_now @enqueue_stmt.execute(class_name, dump_args(args), (), now, run_at || now) @db.last_insert_row_id end |
#fail(job_id, claim_token:, error_class: nil, error_message: nil, finished_at: realtime_now, duration_ms: nil) ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/async/background/queue/store.rb', line 113 def fail(job_id, claim_token:, error_class: nil, error_message: nil, finished_at: realtime_now, duration_ms: nil) ensure_connection @fail_stmt.execute( finished_at, duration_ms, error_class&.to_s, (), job_id, claim_token ) @db.changes.positive? end |
#fetch(worker_id) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/async/background/queue/store.rb', line 87 def fetch(worker_id) ensure_connection token = generate_claim_token now = realtime_now row = transaction do with_statement(@fetch_stmt) { |statement| statement.execute(worker_id, now, token, now).first } end return unless row maybe_cleanup job_from_row(row, token) end |
#mark_started!(job_id, claim_token:, started_at: realtime_now) ⇒ Object
101 102 103 104 105 |
# File 'lib/async/background/queue/store.rb', line 101 def mark_started!(job_id, claim_token:, started_at: realtime_now) ensure_connection @mark_started_stmt.execute(started_at, job_id, claim_token) @db.changes.positive? end |
#migrate! ⇒ Object Also known as: ensure_database!
57 58 59 60 61 62 63 |
# File 'lib/async/background/queue/store.rb', line 57 def migrate! raise SchemaError, 'close the Store before calling migrate!' if connected? with_database { |db| migrate_database!(db) } @schema_checked = true self end |
#next_pending_run_at ⇒ Object
153 154 155 156 |
# File 'lib/async/background/queue/store.rb', line 153 def next_pending_run_at ensure_connection with_statement(@next_pending_stmt) { |statement| statement.execute.first&.first } end |
#prepare_dashboard! ⇒ Object
67 68 69 70 71 72 73 |
# File 'lib/async/background/queue/store.rb', line 67 def prepare_dashboard! raise SchemaError, 'close the Store before calling prepare_dashboard!' if connected? with_database { |db| Schema.prepare_dashboard!(db) } @schema_checked = true self end |
#recover(worker_id) ⇒ Object
147 148 149 150 151 |
# File 'lib/async/background/queue/store.rb', line 147 def recover(worker_id) ensure_connection @requeue_stmt.execute(worker_id) @db.changes end |
#retry_or_fail(job_id, claim_token:, error_class: nil, error_message: nil, fallback_options: nil, finished_at: realtime_now, duration_ms: nil) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/async/background/queue/store.rb', line 126 def retry_or_fail( job_id, claim_token:, error_class: nil, error_message: nil, fallback_options: nil, finished_at: realtime_now, duration_ms: nil ) ensure_connection transaction do = (job_id, claim_token) next unless lease_alive?(job_id, claim_token) policy = retry_policy(, ) policy_retries?(policy) ? retry_job!(job_id, claim_token, policy, error_class, ) : fail_job!(job_id, claim_token, error_class, , finished_at, duration_ms) end end |
#schema_version ⇒ Object
75 76 77 78 |
# File 'lib/async/background/queue/store.rb', line 75 def schema_version ensure_connection @db.get_first_value(SQL::USER_VERSION).to_i end |