Module: DispatchPolicy::Forwarder

Defined in:
lib/dispatch_policy/forwarder.rb

Overview

Re-enqueues admitted jobs onto the real ActiveJob adapter under a ‘Bypass.with` block, so the around_enqueue callback that staged them in the first place lets the call through.

Called from inside Tick’s admission transaction. With a PG-backed adapter (good_job / solid_queue) the adapter’s INSERT shares the transaction, so any exception here aborts the whole admission atomically (staged_jobs return, inflight rows disappear, partition counters revert, adapter rows revert). There is intentionally no rescue here: failures must propagate to roll back the surrounding TX.

Bulk path: rows without scheduled_at go through ActiveJob.perform_all_later, which collapses to a single multi-row INSERT on adapters that implement enqueue_all natively (good_job, solid_queue). Rows with scheduled_at keep the per-row path because perform_all_later doesn’t accept a wait_until per job.

Class Method Summary collapse

Class Method Details

.dispatch(rows) ⇒ Object

Parameters:

  • rows (Array<Hash>)

    admitted staged_job rows (already deleted from staging)

Raises:

  • StandardError propagates any error from deserialize / adapter enqueue

  • EnqueueFailed if the adapter’s enqueue_all returned without raising but flagged any job as not-successfully-enqueued (the atomic contract requires caller-visible failure so the surrounding TX rolls back).



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/dispatch_policy/forwarder.rb', line 29

def dispatch(rows)
  return if rows.empty?

  scheduled, immediate = rows.partition { |row| row["scheduled_at"] }

  if immediate.any?
    jobs = immediate.map { |row| Serializer.deserialize(row["job_data"]) }
    Bypass.with { ::ActiveJob.perform_all_later(jobs) }
    not_enqueued = jobs.reject { |j| j.respond_to?(:successfully_enqueued?) ? j.successfully_enqueued? : true }
    if not_enqueued.any?
      ids = not_enqueued.map(&:job_id).join(", ")
      raise EnqueueFailed,
            "perform_all_later soft-failed #{not_enqueued.size}/#{jobs.size} jobs (#{ids})"
    end
  end

  scheduled.each do |row|
    job        = Serializer.deserialize(row["job_data"])
    wait_until = enqueue_wait_until(row)
    Bypass.with { job.set(wait_until: wait_until).enqueue }
    if job.respond_to?(:successfully_enqueued?) && !job.successfully_enqueued?
      raise EnqueueFailed, "scheduled enqueue soft-failed for #{job.job_id}"
    end
  end
end

.enqueue_wait_until(row) ⇒ Object



55
56
57
58
59
60
61
# File 'lib/dispatch_policy/forwarder.rb', line 55

def enqueue_wait_until(row)
  ts = row["scheduled_at"]
  return nil unless ts
  ts.is_a?(Time) ? ts : Time.parse(ts.to_s)
rescue ArgumentError
  nil
end