Class: ApprovalEngine::ProcessOutboxJob
- Inherits:
-
ApplicationJob
- Object
- ActiveJob::Base
- ApplicationJob
- ApprovalEngine::ProcessOutboxJob
- Defined in:
- app/jobs/approval_engine/process_outbox_job.rb
Overview
Relays one outbox event to the outside world. Core ledger state is already settled by the time this runs (transitions advance synchronously), so the job only fires side-effects: optional host callbacks and an ActiveSupport::Notifications instrumentation hook.
Delivery is at-least-once (host callbacks may run more than once on a redelivery, so they MUST be idempotent) and unordered (sibling events for one record aren’t sequenced — don’t assume after_step_approved precedes after_approved). The row is locked for the whole unit of work so two workers can’t both deliver it. Retries are bounded; an exhausted event is dead-lettered (failed_at set) rather than retried forever.
Class Method Summary collapse
-
.format_error(error) ⇒ Object
Class method so the retry-exhausted block (which runs without a job instance receiver) can reuse it.
Instance Method Summary collapse
Class Method Details
.format_error(error) ⇒ Object
Class method so the retry-exhausted block (which runs without a job instance receiver) can reuse it.
96 97 98 |
# File 'app/jobs/approval_engine/process_outbox_job.rb', line 96 def self.format_error(error) "#{error.class}: #{error.}\n#{Array(error.backtrace).first(5).join("\n")}" end |
Instance Method Details
#perform(outbox_event_id) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'app/jobs/approval_engine/process_outbox_job.rb', line 26 def perform(outbox_event_id) # The lock is held for the whole transaction, so a concurrent worker (or a # drain! pass) blocks here and then sees `processed` — never double-delivers. OutboxEvent.transaction do event = OutboxEvent.unprocessed.lock.find_by(id: outbox_event_id) next unless event # already processed, or its record was purged deliver(event) event.mark_processed! end rescue => e # Record the delivery failure (in its own column, so a retry never clobbers # the semantic error_payload the host callback reads) and re-raise to retry. OutboxEvent.where(id: outbox_event_id).update_all(delivery_error: self.class.format_error(e), updated_at: Time.current) raise end |