Class: ChronoForge::BranchMergeJob
- Inherits:
-
ActiveJob::Base
- Object
- ActiveJob::Base
- ChronoForge::BranchMergeJob
- Defined in:
- lib/chrono_forge/branch_merge_job.rb
Overview
Lightweight poller that joins one or more branches. NOT a workflow — it holds no lock, does no replay, and carries no context. It exists so the heavy parent workflow is replayed only twice per merge (kick off + completion wake).
Constant Summary collapse
- CAP =
cap the pending count; beyond it we just pick max_interval
5_000- FACTOR =
seconds of delay per pending child
0.06- REKICK_AFTER =
5.minutes
- REKICK_BATCH =
bound per-run rekicks; later polls handle the rest
200
Instance Method Summary collapse
Instance Method Details
#perform(parent_key, parent_job_class, branch_log_ids, min_interval, max_interval, token = nil) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/chrono_forge/branch_merge_job.rb', line 24 def perform(parent_key, parent_job_class, branch_log_ids, min_interval, max_interval, token = nil) raise ArgumentError, "branch_log_ids must not be empty" if branch_log_ids.empty? # Fencing: every merge_branches pass mints a fresh token and writes it onto # the branch logs, so a poller from a superseded chain (parent replay / # re-enqueue) holds a stale token. It stops quietly — no poll, no wake, no # reschedule — leaving only the newest chain to drive the merge. (A nil token # is a pre-upgrade job enqueued before fencing existed; it runs unfenced.) return if superseded?(branch_log_ids, token) # Per-branch probe (kept as maps so we can persist each branch's own state, # not just the merge aggregate). Same query count as a plain sum/all?. pending_by_branch = branch_log_ids.to_h { |id| [id, BranchProbe.incomplete(id).limit(CAP).count] } sealed_by_branch = branch_log_ids.to_h { |id| [id, BranchProbe.sealed?(id)] } pending = pending_by_branch.values.sum sealed = sealed_by_branch.values.all? if sealed && pending.zero? record_poll!(pending_by_branch, sealed_by_branch, token, next_poll_at: nil) parent_job_class.constantize.perform_later(parent_key) return end rekick_dropped_jobs(branch_log_ids) delay = reschedule_delay(pending, min_interval, max_interval) record_poll!(pending_by_branch, sealed_by_branch, token, next_poll_at: delay.seconds.from_now) self.class.set(wait: delay.seconds) .perform_later(parent_key, parent_job_class, branch_log_ids, min_interval, max_interval, token) end |