Class: ApprovalEngine::ProcessOutboxJob

Inherits:
ApplicationJob
  • Object
show all
Defined in:
app/jobs/approval_engine/process_outbox_job.rb

Overview

Relays one outbox event to the outside world. Core ledger state is already settled by the time this runs (transitions advance synchronously), so the job only fires side-effects: optional host callbacks and an ActiveSupport::Notifications instrumentation hook.

Delivery is at-least-once (host callbacks may run more than once on a redelivery, so they MUST be idempotent) and unordered (sibling events for one record aren’t sequenced — don’t assume after_step_approved precedes after_approved). The row is locked for the whole unit of work so two workers can’t both deliver it. Retries are bounded; an exhausted event is dead-lettered (failed_at set) rather than retried forever.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.format_error(error) ⇒ Object

Class method so the retry-exhausted block (which runs without a job instance receiver) can reuse it.



96
97
98
# File 'app/jobs/approval_engine/process_outbox_job.rb', line 96

def self.format_error(error)
  "#{error.class}: #{error.message}\n#{Array(error.backtrace).first(5).join("\n")}"
end

Instance Method Details

#perform(outbox_event_id) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'app/jobs/approval_engine/process_outbox_job.rb', line 26

def perform(outbox_event_id)
  # The lock is held for the whole transaction, so a concurrent worker (or a
  # drain! pass) blocks here and then sees `processed` — never double-delivers.
  OutboxEvent.transaction do
    event = OutboxEvent.unprocessed.lock.find_by(id: outbox_event_id)
    next unless event # already processed, or its record was purged

    deliver(event)
    event.mark_processed!
  end
rescue => e
  # Record the delivery failure (in its own column, so a retry never clobbers
  # the semantic error_payload the host callback reads) and re-raise to retry.
  OutboxEvent.where(id: outbox_event_id).update_all(delivery_error: self.class.format_error(e), updated_at: Time.current)
  raise
end