Class: Honker::Outbox

Inherits:
Object
  • Object
show all
Defined in:
lib/honker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(db, name, delivery, visibility_timeout_s:, max_attempts:, base_backoff_s:) ⇒ Outbox

Returns a new instance of Outbox.

Raises:

  • (ArgumentError)


356
357
358
359
360
361
362
363
364
365
366
367
368
# File 'lib/honker.rb', line 356

def initialize(db, name, delivery, visibility_timeout_s:, max_attempts:, base_backoff_s:)
  raise ArgumentError, "delivery must respond to #call" unless delivery.respond_to?(:call)

  @name = name
  @delivery = delivery
  @max_attempts = max_attempts
  @base_backoff_s = base_backoff_s
  @queue = db.queue(
    "_outbox:#{name}",
    visibility_timeout_s: visibility_timeout_s,
    max_attempts: max_attempts,
  )
end

Instance Attribute Details

#base_backoff_sObject (readonly)

Returns the value of attribute base_backoff_s.



354
355
356
# File 'lib/honker.rb', line 354

def base_backoff_s
  @base_backoff_s
end

#max_attemptsObject (readonly)

Returns the value of attribute max_attempts.



354
355
356
# File 'lib/honker.rb', line 354

def max_attempts
  @max_attempts
end

#nameObject (readonly)

Returns the value of attribute name.



354
355
356
# File 'lib/honker.rb', line 354

def name
  @name
end

#queueObject (readonly)

Returns the value of attribute queue.



354
355
356
# File 'lib/honker.rb', line 354

def queue
  @queue
end

Instance Method Details

#enqueue(payload, tx: nil, delay: nil, run_at: nil, priority: 0, expires: nil) ⇒ Object



370
371
372
373
374
375
376
# File 'lib/honker.rb', line 370

def enqueue(payload, tx: nil, delay: nil, run_at: nil, priority: 0, expires: nil)
  if tx
    @queue.enqueue_tx(tx, payload, delay: delay, run_at: run_at, priority: priority, expires: expires)
  else
    @queue.enqueue(payload, delay: delay, run_at: run_at, priority: priority, expires: expires)
  end
end

#run_once(worker_id) ⇒ Object



378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# File 'lib/honker.rb', line 378

def run_once(worker_id)
  job = @queue.claim_one(worker_id)
  return false unless job

  begin
    if @delivery.arity == 1
      @delivery.call(job.payload)
    else
      @delivery.call(job.payload, job)
    end
    raise "outbox ack failed for job #{job.id}" unless job.ack
  rescue StandardError => e
    delay_s = retry_delay(job.attempts)
    raise "outbox retry failed for job #{job.id}" unless job.retry(delay_s: delay_s, error: "#{e}\n#{e.backtrace&.join("\n")}")
  end
  true
end

#run_worker(worker_id, stop: nil, idle_sleep_s: 0.1) ⇒ Object



396
397
398
399
400
401
# File 'lib/honker.rb', line 396

def run_worker(worker_id, stop: nil, idle_sleep_s: 0.1)
  until stop&.call
    processed = run_once(worker_id)
    sleep(idle_sleep_s) unless processed
  end
end