Class: Honker::Queue
- Inherits:
-
Object
- Object
- Honker::Queue
- Defined in:
- lib/honker.rb
Instance Attribute Summary collapse
-
#max_attempts ⇒ Object
readonly
Returns the value of attribute max_attempts.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
-
#_ack(job_id, worker_id) ⇒ Object
Internal: invoked by Job#ack.
-
#_fail(job_id, worker_id, err_msg) ⇒ Object
Internal: invoked by Job#fail.
-
#_heartbeat(job_id, worker_id, extend_s) ⇒ Object
Internal: invoked by Job#heartbeat.
-
#_retry(job_id, worker_id, delay_s, err_msg) ⇒ Object
Internal: invoked by Job#retry.
-
#ack_batch(ids, worker_id) ⇒ Object
Ack multiple jobs in one transaction.
-
#claim_batch(worker_id, n) ⇒ Object
Claim up to n jobs atomically.
-
#claim_one(worker_id) ⇒ Object
Claim a single job or nil if the queue is empty.
-
#enqueue(payload, delay: nil, run_at: nil, priority: 0, expires: nil) ⇒ Object
Enqueue a job.
-
#enqueue_tx(tx, payload, delay: nil, run_at: nil, priority: 0, expires: nil) ⇒ Object
Enqueue inside an open transaction.
-
#initialize(db, name, visibility_timeout_s:, max_attempts:) ⇒ Queue
constructor
A new instance of Queue.
-
#sweep_expired ⇒ Object
Sweep this queue’s expired claims back to pending.
Constructor Details
#initialize(db, name, visibility_timeout_s:, max_attempts:) ⇒ Queue
Returns a new instance of Queue.
188 189 190 191 192 193 |
# File 'lib/honker.rb', line 188 def initialize(db, name, visibility_timeout_s:, max_attempts:) @db = db @name = name @visibility_timeout_s = visibility_timeout_s @max_attempts = max_attempts end |
Instance Attribute Details
#max_attempts ⇒ Object (readonly)
Returns the value of attribute max_attempts.
186 187 188 |
# File 'lib/honker.rb', line 186 def max_attempts @max_attempts end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
186 187 188 |
# File 'lib/honker.rb', line 186 def name @name end |
Instance Method Details
#_ack(job_id, worker_id) ⇒ Object
Internal: invoked by Job#ack.
248 249 250 |
# File 'lib/honker.rb', line 248 def _ack(job_id, worker_id) @db.db.get_first_row("SELECT honker_ack(?, ?)", [job_id, worker_id])[0] == 1 end |
#_fail(job_id, worker_id, err_msg) ⇒ Object
Internal: invoked by Job#fail.
261 262 263 264 265 266 |
# File 'lib/honker.rb', line 261 def _fail(job_id, worker_id, err_msg) @db.db.get_first_row( "SELECT honker_fail(?, ?, ?)", [job_id, worker_id, err_msg], )[0] == 1 end |
#_heartbeat(job_id, worker_id, extend_s) ⇒ Object
Internal: invoked by Job#heartbeat.
269 270 271 272 273 274 |
# File 'lib/honker.rb', line 269 def _heartbeat(job_id, worker_id, extend_s) @db.db.get_first_row( "SELECT honker_heartbeat(?, ?, ?)", [job_id, worker_id, extend_s], )[0] == 1 end |
#_retry(job_id, worker_id, delay_s, err_msg) ⇒ Object
Internal: invoked by Job#retry.
253 254 255 256 257 258 |
# File 'lib/honker.rb', line 253 def _retry(job_id, worker_id, delay_s, err_msg) @db.db.get_first_row( "SELECT honker_retry(?, ?, ?, ?)", [job_id, worker_id, delay_s, err_msg], )[0] == 1 end |
#ack_batch(ids, worker_id) ⇒ Object
Ack multiple jobs in one transaction. Returns the number acked.
231 232 233 234 235 236 |
# File 'lib/honker.rb', line 231 def ack_batch(ids, worker_id) @db.db.get_first_row( "SELECT honker_ack_batch(?, ?)", [JSON.dump(ids), worker_id], )[0] end |
#claim_batch(worker_id, n) ⇒ Object
Claim up to n jobs atomically. Returns an array of Job.
217 218 219 220 221 222 223 |
# File 'lib/honker.rb', line 217 def claim_batch(worker_id, n) rows_json = @db.db.get_first_row( "SELECT honker_claim_batch(?, ?, ?, ?)", [@name, worker_id, n, @visibility_timeout_s], )[0] JSON.parse(rows_json).map { |r| Job.new(self, r) } end |
#claim_one(worker_id) ⇒ Object
Claim a single job or nil if the queue is empty.
226 227 228 |
# File 'lib/honker.rb', line 226 def claim_one(worker_id) claim_batch(worker_id, 1).first end |
#enqueue(payload, delay: nil, run_at: nil, priority: 0, expires: nil) ⇒ Object
Enqueue a job. Returns the inserted row id.
q.enqueue({to: "alice"}, delay: 60, priority: 10, expires: 3600)
198 199 200 201 202 203 204 |
# File 'lib/honker.rb', line 198 def enqueue(payload, delay: nil, run_at: nil, priority: 0, expires: nil) row = @db.db.get_first_row( "SELECT honker_enqueue(?, ?, ?, ?, ?, ?, ?)", [@name, JSON.dump(payload), run_at, delay, priority, @max_attempts, expires], ) row[0] end |
#enqueue_tx(tx, payload, delay: nil, run_at: nil, priority: 0, expires: nil) ⇒ Object
Enqueue inside an open transaction. Atomic with whatever else ran on the same tx.
208 209 210 211 212 213 214 |
# File 'lib/honker.rb', line 208 def enqueue_tx(tx, payload, delay: nil, run_at: nil, priority: 0, expires: nil) row = tx.query_row( "SELECT honker_enqueue(?, ?, ?, ?, ?, ?, ?)", [@name, JSON.dump(payload), run_at, delay, priority, @max_attempts, expires], ) row[0] end |
#sweep_expired ⇒ Object
Sweep this queue’s expired claims back to pending. Returns the number of rows reclaimed.
240 241 242 243 244 245 |
# File 'lib/honker.rb', line 240 def sweep_expired @db.db.get_first_row( "SELECT honker_sweep_expired(?)", [@name], )[0] end |