Class: Honker::Queue
- Inherits:
-
Object
- Object
- Honker::Queue
- Defined in:
- lib/honker.rb
Instance Attribute Summary collapse
-
#max_attempts ⇒ Object
readonly
Returns the value of attribute max_attempts.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
-
#_ack(job_id, worker_id) ⇒ Object
Internal: invoked by Job#ack.
-
#_fail(job_id, worker_id, err_msg) ⇒ Object
Internal: invoked by Job#fail.
-
#_heartbeat(job_id, worker_id, extend_s) ⇒ Object
Internal: invoked by Job#heartbeat.
-
#_retry(job_id, worker_id, delay_s, err_msg) ⇒ Object
Internal: invoked by Job#retry.
-
#ack_batch(ids, worker_id) ⇒ Object
Ack multiple jobs in one transaction.
-
#claim_batch(worker_id, n) ⇒ Object
Claim up to n jobs atomically.
-
#claim_one(worker_id) ⇒ Object
Claim a single job or nil if the queue is empty.
-
#enqueue(payload, delay: nil, run_at: nil, priority: 0, expires: nil) ⇒ Object
Enqueue a job.
-
#enqueue_tx(tx, payload, delay: nil, run_at: nil, priority: 0, expires: nil) ⇒ Object
Enqueue inside an open transaction.
-
#initialize(db, name, visibility_timeout_s:, max_attempts:) ⇒ Queue
constructor
A new instance of Queue.
-
#sweep_expired ⇒ Object
Sweep this queue’s expired claims back to pending.
Constructor Details
#initialize(db, name, visibility_timeout_s:, max_attempts:) ⇒ Queue
Returns a new instance of Queue.
264 265 266 267 268 269 |
# File 'lib/honker.rb', line 264 def initialize(db, name, visibility_timeout_s:, max_attempts:) @db = db @name = name @visibility_timeout_s = visibility_timeout_s @max_attempts = max_attempts end |
Instance Attribute Details
#max_attempts ⇒ Object (readonly)
Returns the value of attribute max_attempts.
262 263 264 |
# File 'lib/honker.rb', line 262 def max_attempts @max_attempts end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
262 263 264 |
# File 'lib/honker.rb', line 262 def name @name end |
Instance Method Details
#_ack(job_id, worker_id) ⇒ Object
Internal: invoked by Job#ack.
324 325 326 |
# File 'lib/honker.rb', line 324 def _ack(job_id, worker_id) @db.db.get_first_row("SELECT honker_ack(?, ?)", [job_id, worker_id])[0] == 1 end |
#_fail(job_id, worker_id, err_msg) ⇒ Object
Internal: invoked by Job#fail.
337 338 339 340 341 342 |
# File 'lib/honker.rb', line 337 def _fail(job_id, worker_id, err_msg) @db.db.get_first_row( "SELECT honker_fail(?, ?, ?)", [job_id, worker_id, err_msg], )[0] == 1 end |
#_heartbeat(job_id, worker_id, extend_s) ⇒ Object
Internal: invoked by Job#heartbeat.
345 346 347 348 349 350 |
# File 'lib/honker.rb', line 345 def _heartbeat(job_id, worker_id, extend_s) @db.db.get_first_row( "SELECT honker_heartbeat(?, ?, ?)", [job_id, worker_id, extend_s], )[0] == 1 end |
#_retry(job_id, worker_id, delay_s, err_msg) ⇒ Object
Internal: invoked by Job#retry.
329 330 331 332 333 334 |
# File 'lib/honker.rb', line 329 def _retry(job_id, worker_id, delay_s, err_msg) @db.db.get_first_row( "SELECT honker_retry(?, ?, ?, ?)", [job_id, worker_id, delay_s, err_msg], )[0] == 1 end |
#ack_batch(ids, worker_id) ⇒ Object
Ack multiple jobs in one transaction. Returns the number acked.
307 308 309 310 311 312 |
# File 'lib/honker.rb', line 307 def ack_batch(ids, worker_id) @db.db.get_first_row( "SELECT honker_ack_batch(?, ?)", [JSON.dump(ids), worker_id], )[0] end |
#claim_batch(worker_id, n) ⇒ Object
Claim up to n jobs atomically. Returns an array of Job.
293 294 295 296 297 298 299 |
# File 'lib/honker.rb', line 293 def claim_batch(worker_id, n) rows_json = @db.db.get_first_row( "SELECT honker_claim_batch(?, ?, ?, ?)", [@name, worker_id, n, @visibility_timeout_s], )[0] JSON.parse(rows_json).map { |r| Job.new(self, r) } end |
#claim_one(worker_id) ⇒ Object
Claim a single job or nil if the queue is empty.
302 303 304 |
# File 'lib/honker.rb', line 302 def claim_one(worker_id) claim_batch(worker_id, 1).first end |
#enqueue(payload, delay: nil, run_at: nil, priority: 0, expires: nil) ⇒ Object
Enqueue a job. Returns the inserted row id.
q.enqueue({to: "alice"}, delay: 60, priority: 10, expires: 3600)
274 275 276 277 278 279 280 |
# File 'lib/honker.rb', line 274 def enqueue(payload, delay: nil, run_at: nil, priority: 0, expires: nil) row = @db.db.get_first_row( "SELECT honker_enqueue(?, ?, ?, ?, ?, ?, ?)", [@name, JSON.dump(payload), run_at, delay, priority, @max_attempts, expires], ) row[0] end |
#enqueue_tx(tx, payload, delay: nil, run_at: nil, priority: 0, expires: nil) ⇒ Object
Enqueue inside an open transaction. Atomic with whatever else ran on the same tx.
284 285 286 287 288 289 290 |
# File 'lib/honker.rb', line 284 def enqueue_tx(tx, payload, delay: nil, run_at: nil, priority: 0, expires: nil) row = tx.query_row( "SELECT honker_enqueue(?, ?, ?, ?, ?, ?, ?)", [@name, JSON.dump(payload), run_at, delay, priority, @max_attempts, expires], ) row[0] end |
#sweep_expired ⇒ Object
Sweep this queue’s expired claims back to pending. Returns the number of rows reclaimed.
316 317 318 319 320 321 |
# File 'lib/honker.rb', line 316 def sweep_expired @db.db.get_first_row( "SELECT honker_sweep_expired(?)", [@name], )[0] end |