Class: Honker::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/honker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(db, name, visibility_timeout_s:, max_attempts:) ⇒ Queue

Returns a new instance of Queue.



197
198
199
200
201
202
# File 'lib/honker.rb', line 197

def initialize(db, name, visibility_timeout_s:, max_attempts:)
  @db = db
  @name = name
  @visibility_timeout_s = visibility_timeout_s
  @max_attempts = max_attempts
end

Instance Attribute Details

#max_attemptsObject (readonly)

Returns the value of attribute max_attempts.



195
196
197
# File 'lib/honker.rb', line 195

def max_attempts
  @max_attempts
end

#nameObject (readonly)

Returns the value of attribute name.



195
196
197
# File 'lib/honker.rb', line 195

def name
  @name
end

Instance Method Details

#_ack(job_id, worker_id) ⇒ Object

Internal: invoked by Job#ack.



257
258
259
# File 'lib/honker.rb', line 257

def _ack(job_id, worker_id)
  @db.db.get_first_row("SELECT honker_ack(?, ?)", [job_id, worker_id])[0] == 1
end

#_fail(job_id, worker_id, err_msg) ⇒ Object

Internal: invoked by Job#fail.



270
271
272
273
274
275
# File 'lib/honker.rb', line 270

def _fail(job_id, worker_id, err_msg)
  @db.db.get_first_row(
    "SELECT honker_fail(?, ?, ?)",
    [job_id, worker_id, err_msg],
  )[0] == 1
end

#_heartbeat(job_id, worker_id, extend_s) ⇒ Object

Internal: invoked by Job#heartbeat.



278
279
280
281
282
283
# File 'lib/honker.rb', line 278

def _heartbeat(job_id, worker_id, extend_s)
  @db.db.get_first_row(
    "SELECT honker_heartbeat(?, ?, ?)",
    [job_id, worker_id, extend_s],
  )[0] == 1
end

#_retry(job_id, worker_id, delay_s, err_msg) ⇒ Object

Internal: invoked by Job#retry.



262
263
264
265
266
267
# File 'lib/honker.rb', line 262

def _retry(job_id, worker_id, delay_s, err_msg)
  @db.db.get_first_row(
    "SELECT honker_retry(?, ?, ?, ?)",
    [job_id, worker_id, delay_s, err_msg],
  )[0] == 1
end

#ack_batch(ids, worker_id) ⇒ Object

Ack multiple jobs in one transaction. Returns the number acked.



240
241
242
243
244
245
# File 'lib/honker.rb', line 240

def ack_batch(ids, worker_id)
  @db.db.get_first_row(
    "SELECT honker_ack_batch(?, ?)",
    [JSON.dump(ids), worker_id],
  )[0]
end

#claim_batch(worker_id, n) ⇒ Object

Claim up to n jobs atomically. Returns an array of Job.



226
227
228
229
230
231
232
# File 'lib/honker.rb', line 226

def claim_batch(worker_id, n)
  rows_json = @db.db.get_first_row(
    "SELECT honker_claim_batch(?, ?, ?, ?)",
    [@name, worker_id, n, @visibility_timeout_s],
  )[0]
  JSON.parse(rows_json).map { |r| Job.new(self, r) }
end

#claim_one(worker_id) ⇒ Object

Claim a single job or nil if the queue is empty.



235
236
237
# File 'lib/honker.rb', line 235

def claim_one(worker_id)
  claim_batch(worker_id, 1).first
end

#enqueue(payload, delay: nil, run_at: nil, priority: 0, expires: nil) ⇒ Object

Enqueue a job. Returns the inserted row id.

q.enqueue({to: "alice"}, delay: 60, priority: 10, expires: 3600)


207
208
209
210
211
212
213
# File 'lib/honker.rb', line 207

def enqueue(payload, delay: nil, run_at: nil, priority: 0, expires: nil)
  row = @db.db.get_first_row(
    "SELECT honker_enqueue(?, ?, ?, ?, ?, ?, ?)",
    [@name, JSON.dump(payload), run_at, delay, priority, @max_attempts, expires],
  )
  row[0]
end

#enqueue_tx(tx, payload, delay: nil, run_at: nil, priority: 0, expires: nil) ⇒ Object

Enqueue inside an open transaction. Atomic with whatever else ran on the same tx.



217
218
219
220
221
222
223
# File 'lib/honker.rb', line 217

def enqueue_tx(tx, payload, delay: nil, run_at: nil, priority: 0, expires: nil)
  row = tx.query_row(
    "SELECT honker_enqueue(?, ?, ?, ?, ?, ?, ?)",
    [@name, JSON.dump(payload), run_at, delay, priority, @max_attempts, expires],
  )
  row[0]
end

#sweep_expiredObject

Sweep this queue’s expired claims back to pending. Returns the number of rows reclaimed.



249
250
251
252
253
254
# File 'lib/honker.rb', line 249

def sweep_expired
  @db.db.get_first_row(
    "SELECT honker_sweep_expired(?)",
    [@name],
  )[0]
end