Class: StandardLedger::ProjectionJob
- Inherits:
-
ActiveJob::Base
- Object
- ActiveJob::Base
- StandardLedger::ProjectionJob
- Defined in:
- lib/standard_ledger/jobs/projection_job.rb
Overview
ActiveJob class that runs a single ‘:async`-mode projection after the entry’s outer transaction has committed. Enqueued by ‘StandardLedger::Modes::Async` from an `after_create_commit` callback.
The job resolves the projection definition by ‘target_association` (the only stable handle — the Definition struct doesn’t serialize cleanly through ActiveJob), looks up the target via the entry’s ‘belongs_to` setter, and runs `target.with_lock { projector_class.new.apply(target, entry) }`. The projector must be class-form (`via: ProjectorClass`) and should recompute the aggregate from the log inside `apply` for retry safety — async projections can run more than once when the job retries.
Retries are configurable per-projection via subclassing this job and overriding ‘retry_on`, or globally via `Config#default_async_retries` (default 3). When retries are exhausted, ActiveJob’s dead-letter behavior takes over — the job still emits ‘<prefix>.projection.failed` on every attempt so subscribers see the full retry history.
Notification payloads include ‘attempt:` (drawn from ActiveJob’s ‘executions` accessor — 1 on first attempt, increments per retry) so subscribers can distinguish first-try success from retry-success.
Instance Method Summary collapse
Instance Method Details
#perform(entry, target_association) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/standard_ledger/jobs/projection_job.rb', line 56 def perform(entry, target_association) definition = entry.class.standard_ledger_projections.find { |d| d.mode == :async && d.target_association == target_association.to_sym } if definition.nil? raise StandardLedger::Error, "no :async projection #{target_association.inspect} on #{entry.class.name}" end target = entry.public_send(definition.target_association) return if target.nil? prefix = StandardLedger.config.notification_namespace started = Process.clock_gettime(Process::CLOCK_MONOTONIC) begin target.with_lock do definition.projector_class.new.apply(target, entry) end rescue StandardError => e duration_ms = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - started) * 1000.0 StandardLedger::EventEmitter.emit( "#{prefix}.projection.failed", entry: entry, target: target, projection: definition.target_association, mode: :async, error: e, duration_ms: duration_ms, attempt: executions ) raise end duration_ms = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - started) * 1000.0 StandardLedger::EventEmitter.emit( "#{prefix}.projection.applied", entry: entry, target: target, projection: definition.target_association, mode: :async, duration_ms: duration_ms, attempt: executions ) end |