Class: ActiveStorage::AsyncVariants::HeartbeatWatchdogJob

Inherits:
ActiveJob::Base
  • Object
show all
Defined in:
lib/active_storage/async_variants/heartbeat_watchdog_job.rb

Overview

External transforms free the worker at ‘initiate` and report progress via heartbeats; if the service dies, no terminal callback arrives and the record would sit in “processing” forever. This flips it to “failed” once the heartbeats go stale, re-arming itself each tick until then.

Instance Method Summary collapse

Instance Method Details

#perform(variant_record) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/active_storage/async_variants/heartbeat_watchdog_job.rb', line 13

def perform(variant_record)
  stale_after = ActiveStorage::AsyncVariants.heartbeat_stale_after
  active = ActiveStorage::VariantRecord.where(id: variant_record.id, state: %w[pending processing])
  stale = active.where("last_heartbeat_at < ?", Time.current - stale_after)

  # Atomic check-and-set: a success/progress callback landing in the same
  # instant moves the row out of `stale` and wins, instead of being clobbered.
  marked_failed_count = stale.update_all(
    state: "failed",
    error: "Transcoding stalled: no heartbeat for over #{stale_after.to_i}s",
  )

  if marked_failed_count.positive?
    touch_consumers(variant_record)
  elsif active.any?
    self.class.set(wait: stale_after).perform_later(variant_record)
  end
end