Class: ChronoForge::BranchMergeJob

Inherits:
ActiveJob::Base
  • Object
show all
Defined in:
lib/chrono_forge/branch_merge_job.rb

Overview

Lightweight poller that joins one or more branches. NOT a workflow — it holds no lock, does no replay, and carries no context. It exists so the heavy parent workflow is replayed only twice per merge (kick off + completion wake).

Constant Summary collapse

CAP =

cap the pending count; beyond it we just pick max_interval

5_000
FACTOR =

seconds of delay per pending child

0.06
REKICK_AFTER =
5.minutes
REKICK_BATCH =

bound per-run rekicks; later polls handle the rest

200

Instance Method Summary collapse

Instance Method Details

#perform(parent_key, parent_job_class, branch_log_ids, min_interval, max_interval, token = nil) ⇒ Object

Raises:

  • (ArgumentError)


24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/chrono_forge/branch_merge_job.rb', line 24

def perform(parent_key, parent_job_class, branch_log_ids, min_interval, max_interval, token = nil)
  raise ArgumentError, "branch_log_ids must not be empty" if branch_log_ids.empty?

  # Fencing: every merge_branches pass mints a fresh token and writes it onto
  # the branch logs, so a poller from a superseded chain (parent replay /
  # re-enqueue) holds a stale token. It stops quietly — no poll, no wake, no
  # reschedule — leaving only the newest chain to drive the merge. (A nil token
  # is a pre-upgrade job enqueued before fencing existed; it runs unfenced.)
  return if superseded?(branch_log_ids, token)

  # Per-branch probe (kept as maps so we can persist each branch's own state,
  # not just the merge aggregate). Same query count as a plain sum/all?.
  pending_by_branch = branch_log_ids.to_h { |id| [id, BranchProbe.incomplete(id).limit(CAP).count] }
  sealed_by_branch = branch_log_ids.to_h { |id| [id, BranchProbe.sealed?(id)] }
  pending = pending_by_branch.values.sum
  sealed = sealed_by_branch.values.all?

  if sealed && pending.zero?
    record_poll!(pending_by_branch, sealed_by_branch, token, next_poll_at: nil)
    parent_job_class.constantize.perform_later(parent_key)
    return
  end

  rekick_dropped_jobs(branch_log_ids)

  delay = reschedule_delay(pending, min_interval, max_interval)
  record_poll!(pending_by_branch, sealed_by_branch, token, next_poll_at: delay.seconds.from_now)
  self.class.set(wait: delay.seconds)
    .perform_later(parent_key, parent_job_class, branch_log_ids, min_interval, max_interval, token)
end