Class: DispatchPolicy::Tick
- Inherits:
-
Object
- Object
- DispatchPolicy::Tick
- Defined in:
- lib/dispatch_policy/tick.rb
Overview
One pass of admission for a single policy.
Records a row in dispatch_policy_tick_samples at the end so the engine UI can show throughput, denial reasons, and tick duration without sampling on the read path.
Defined Under Namespace
Classes: Result
Class Method Summary collapse
Instance Method Summary collapse
- #call ⇒ Object
-
#initialize(policy_name, shard: nil) ⇒ Tick
constructor
A new instance of Tick.
Constructor Details
#initialize(policy_name, shard: nil) ⇒ Tick
Returns a new instance of Tick.
16 17 18 19 20 21 |
# File 'lib/dispatch_policy/tick.rb', line 16 def initialize(policy_name, shard: nil) @policy_name = policy_name @shard = shard @policy = DispatchPolicy.registry.fetch(policy_name) || raise(InvalidPolicy, "unknown policy #{policy_name.inspect}") @config = DispatchPolicy.config end |
Class Method Details
.run(policy_name:, shard: nil) ⇒ Object
12 13 14 |
# File 'lib/dispatch_policy/tick.rb', line 12 def self.run(policy_name:, shard: nil) new(policy_name, shard: shard).call end |
Instance Method Details
#call ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/dispatch_policy/tick.rb', line 23 def call started_at = monotonic_now_ms partitions_seen = 0 partitions_admitted = 0 partitions_denied = 0 jobs_admitted = 0 forward_failures = 0 denied_reasons = Hash.new(0) partitions = Repository.claim_partitions( policy_name: @policy_name, shard: @shard, limit: @config.partition_batch_size ) # Reorder by least-recent-admit-weighted (EWMA decayed_admits ASC) # so under-admitted partitions get first crack at the tick budget. # claim_partitions ALREADY enforced anti-stagnation via # last_checked_at — every partition with pending is visited within # ⌈active_partitions / partition_batch_size⌉ ticks regardless of # decayed_admits. Reordering here only decides order *inside* this # already-fair selection. sort_partitions_for_fairness!(partitions) # Per-partition fair share. When tick_admission_budget is set, we # divide it evenly across the partitions we just claimed. Otherwise # the legacy admission_batch_size is the per-partition ceiling. # # We deliberately do NOT clamp fair_share to a minimum of 1 when # tick_cap < N. The hard global cap wins over a per-partition # admit floor; partitions that don't admit this tick are still # visited (last_checked_at bumped) and re-visited next tick when # they'll be at the front of the in-tick decay order. # Anti-stagnation comes from claim_partitions, not from forcing # an admit on every claimed partition. tick_cap = @policy.tick_admission_budget || @config.tick_admission_budget per_part = @policy.admission_batch_size || @config.admission_batch_size fair_share = if tick_cap && partitions.any? (tick_cap.to_f / partitions.size).ceil else per_part end pending_denies = [] admitted_per_partition = Hash.new(0) used = 0 partitions.each do |partition| partitions_seen += 1 if tick_cap && used >= tick_cap # Global cap exhausted in pass-1. The partition is still # observed (claim_partitions bumped its last_checked_at), so # the round-robin invariant for anti-stagnation holds; we # just admit nothing this tick. partitions_denied += 1 denied_reasons["tick_cap_exhausted"] += 1 # Push this partition to the deny path so its gate state # still gets persisted — the pipeline already evaluated it # in admit_partition... actually we haven't called admit yet. # Skip: not adding to pending_denies because the pipeline # didn't run, no gate_state_patch to flush. next end budget_for_this = if tick_cap [fair_share, tick_cap - used].min else fair_share end budget_for_this = 0 if budget_for_this.negative? outcome = admit_partition(partition, pending_denies, max_budget: budget_for_this) admitted_per_partition[partition["partition_key"]] = outcome[:admitted] jobs_admitted += outcome[:admitted] forward_failures += outcome[:failures] used += outcome[:admitted] if outcome[:admitted].positive? partitions_admitted += 1 else partitions_denied += 1 outcome[:reasons].each { |r| denied_reasons[r] += 1 } end end # Pass-2: redistribution. Pass-1 may have left budget unused if # some partitions had less pending than their fair share. Walk the # claimed partitions (still in decay-sorted order) and offer the # leftover to whoever filled their fair share in pass-1 — a signal # they had more pending than we let them admit. if tick_cap remaining = tick_cap - used if remaining.positive? partitions.each do |p| break if remaining <= 0 next if admitted_per_partition[p["partition_key"]] < fair_share extra_cap = [remaining, fair_share].min outcome = admit_partition(p, pending_denies, max_budget: extra_cap) jobs_admitted += outcome[:admitted] forward_failures += outcome[:failures] admitted_per_partition[p["partition_key"]] += outcome[:admitted] remaining -= outcome[:admitted] end end end flush_denies!(pending_denies) if pending_denies.any? duration_ms = monotonic_now_ms - started_at record_sample!( duration_ms: duration_ms, partitions_seen: partitions_seen, partitions_admitted: partitions_admitted, partitions_denied: partitions_denied, jobs_admitted: jobs_admitted, forward_failures: forward_failures, denied_reasons: denied_reasons ) Result.new(partitions_seen: partitions_seen, jobs_admitted: jobs_admitted) end |