Class: Honker::Outbox
- Inherits:
-
Object
- Object
- Honker::Outbox
- Defined in:
- lib/honker.rb
Instance Attribute Summary collapse
-
#base_backoff_s ⇒ Object
readonly
Returns the value of attribute base_backoff_s.
-
#max_attempts ⇒ Object
readonly
Returns the value of attribute max_attempts.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
- #enqueue(payload, tx: nil, delay: nil, run_at: nil, priority: 0, expires: nil) ⇒ Object
-
#initialize(db, name, delivery, visibility_timeout_s:, max_attempts:, base_backoff_s:) ⇒ Outbox
constructor
A new instance of Outbox.
- #run_once(worker_id) ⇒ Object
- #run_worker(worker_id, stop: nil, idle_sleep_s: 0.1) ⇒ Object
Constructor Details
#initialize(db, name, delivery, visibility_timeout_s:, max_attempts:, base_backoff_s:) ⇒ Outbox
Returns a new instance of Outbox.
356 357 358 359 360 361 362 363 364 365 366 367 368 |
# File 'lib/honker.rb', line 356 def initialize(db, name, delivery, visibility_timeout_s:, max_attempts:, base_backoff_s:) raise ArgumentError, "delivery must respond to #call" unless delivery.respond_to?(:call) @name = name @delivery = delivery @max_attempts = max_attempts @base_backoff_s = base_backoff_s @queue = db.queue( "_outbox:#{name}", visibility_timeout_s: visibility_timeout_s, max_attempts: max_attempts, ) end |
Instance Attribute Details
#base_backoff_s ⇒ Object (readonly)
Returns the value of attribute base_backoff_s.
354 355 356 |
# File 'lib/honker.rb', line 354 def base_backoff_s @base_backoff_s end |
#max_attempts ⇒ Object (readonly)
Returns the value of attribute max_attempts.
354 355 356 |
# File 'lib/honker.rb', line 354 def max_attempts @max_attempts end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
354 355 356 |
# File 'lib/honker.rb', line 354 def name @name end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
354 355 356 |
# File 'lib/honker.rb', line 354 def queue @queue end |
Instance Method Details
#enqueue(payload, tx: nil, delay: nil, run_at: nil, priority: 0, expires: nil) ⇒ Object
370 371 372 373 374 375 376 |
# File 'lib/honker.rb', line 370 def enqueue(payload, tx: nil, delay: nil, run_at: nil, priority: 0, expires: nil) if tx @queue.enqueue_tx(tx, payload, delay: delay, run_at: run_at, priority: priority, expires: expires) else @queue.enqueue(payload, delay: delay, run_at: run_at, priority: priority, expires: expires) end end |
#run_once(worker_id) ⇒ Object
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 |
# File 'lib/honker.rb', line 378 def run_once(worker_id) job = @queue.claim_one(worker_id) return false unless job begin if @delivery.arity == 1 @delivery.call(job.payload) else @delivery.call(job.payload, job) end raise "outbox ack failed for job #{job.id}" unless job.ack rescue StandardError => e delay_s = retry_delay(job.attempts) raise "outbox retry failed for job #{job.id}" unless job.retry(delay_s: delay_s, error: "#{e}\n#{e.backtrace&.join("\n")}") end true end |
#run_worker(worker_id, stop: nil, idle_sleep_s: 0.1) ⇒ Object
396 397 398 399 400 401 |
# File 'lib/honker.rb', line 396 def run_worker(worker_id, stop: nil, idle_sleep_s: 0.1) until stop&.call processed = run_once(worker_id) sleep(idle_sleep_s) unless processed end end |