Class: DispatchPolicy::Tick

Inherits:
Object
  • Object
show all
Defined in:
lib/dispatch_policy/tick.rb

Overview

One pass of admission for a single policy.

Records a row in dispatch_policy_tick_samples at the end so the engine UI can show throughput, denial reasons, and tick duration without sampling on the read path.

Defined Under Namespace

Classes: Result

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(policy_name, shard: nil) ⇒ Tick

Returns a new instance of Tick.



16
17
18
19
20
21
# File 'lib/dispatch_policy/tick.rb', line 16

def initialize(policy_name, shard: nil)
  @policy_name = policy_name
  @shard       = shard
  @policy      = DispatchPolicy.registry.fetch(policy_name) || raise(InvalidPolicy, "unknown policy #{policy_name.inspect}")
  @config      = DispatchPolicy.config
end

Class Method Details

.run(policy_name:, shard: nil) ⇒ Object



12
13
14
# File 'lib/dispatch_policy/tick.rb', line 12

def self.run(policy_name:, shard: nil)
  new(policy_name, shard: shard).call
end

Instance Method Details

#callObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/dispatch_policy/tick.rb', line 23

def call
  started_at         = monotonic_now_ms
  partitions_seen    = 0
  partitions_admitted = 0
  partitions_denied   = 0
  jobs_admitted      = 0
  forward_failures   = 0
  denied_reasons     = Hash.new(0)

  partitions = Repository.claim_partitions(
    policy_name: @policy_name,
    shard:       @shard,
    limit:       @config.partition_batch_size
  )

  # Reorder by least-recent-admit-weighted (EWMA decayed_admits ASC)
  # so under-admitted partitions get first crack at the tick budget.
  # claim_partitions ALREADY enforced anti-stagnation via
  # last_checked_at — every partition with pending is visited within
  # ⌈active_partitions / partition_batch_size⌉ ticks regardless of
  # decayed_admits. Reordering here only decides order *inside* this
  # already-fair selection.
  sort_partitions_for_fairness!(partitions)

  # Per-partition fair share. When tick_admission_budget is set, we
  # divide it evenly across the partitions we just claimed. Otherwise
  # the legacy admission_batch_size is the per-partition ceiling.
  #
  # We deliberately do NOT clamp fair_share to a minimum of 1 when
  # tick_cap < N. The hard global cap wins over a per-partition
  # admit floor; partitions that don't admit this tick are still
  # visited (last_checked_at bumped) and re-visited next tick when
  # they'll be at the front of the in-tick decay order.
  # Anti-stagnation comes from claim_partitions, not from forcing
  # an admit on every claimed partition.
  tick_cap   = @policy.tick_admission_budget || @config.tick_admission_budget
  per_part   = @policy.admission_batch_size || @config.admission_batch_size
  fair_share = if tick_cap && partitions.any?
                 (tick_cap.to_f / partitions.size).ceil
               else
                 per_part
               end

  pending_denies          = []
  admitted_per_partition  = Hash.new(0)
  used                    = 0

  partitions.each do |partition|
    partitions_seen += 1

    if tick_cap && used >= tick_cap
      # Global cap exhausted in pass-1. The partition is still
      # observed (claim_partitions bumped its last_checked_at), so
      # the round-robin invariant for anti-stagnation holds; we
      # just admit nothing this tick.
      partitions_denied += 1
      denied_reasons["tick_cap_exhausted"] += 1
      # Push this partition to the deny path so its gate state
      # still gets persisted — the pipeline already evaluated it
      # in admit_partition... actually we haven't called admit yet.
      # Skip: not adding to pending_denies because the pipeline
      # didn't run, no gate_state_patch to flush.
      next
    end

    budget_for_this = if tick_cap
                        [fair_share, tick_cap - used].min
                      else
                        fair_share
                      end
    budget_for_this = 0 if budget_for_this.negative?

    outcome = admit_partition(partition, pending_denies, max_budget: budget_for_this)
    admitted_per_partition[partition["partition_key"]] = outcome[:admitted]

    jobs_admitted    += outcome[:admitted]
    forward_failures += outcome[:failures]
    used             += outcome[:admitted]

    if outcome[:admitted].positive?
      partitions_admitted += 1
    else
      partitions_denied += 1
      outcome[:reasons].each { |r| denied_reasons[r] += 1 }
    end
  end

  # Pass-2: redistribution. Pass-1 may have left budget unused if
  # some partitions had less pending than their fair share. Walk the
  # claimed partitions (still in decay-sorted order) and offer the
  # leftover to whoever filled their fair share in pass-1 — a signal
  # they had more pending than we let them admit.
  if tick_cap
    remaining = tick_cap - used
    if remaining.positive?
      partitions.each do |p|
        break if remaining <= 0
        next  if admitted_per_partition[p["partition_key"]] < fair_share

        extra_cap = [remaining, fair_share].min
        outcome   = admit_partition(p, pending_denies, max_budget: extra_cap)
        jobs_admitted += outcome[:admitted]
        forward_failures += outcome[:failures]
        admitted_per_partition[p["partition_key"]] += outcome[:admitted]
        remaining -= outcome[:admitted]
      end
    end
  end

  flush_denies!(pending_denies) if pending_denies.any?

  duration_ms = monotonic_now_ms - started_at

  record_sample!(
    duration_ms:         duration_ms,
    partitions_seen:     partitions_seen,
    partitions_admitted: partitions_admitted,
    partitions_denied:   partitions_denied,
    jobs_admitted:       jobs_admitted,
    forward_failures:    forward_failures,
    denied_reasons:      denied_reasons
  )

  Result.new(partitions_seen: partitions_seen, jobs_admitted: jobs_admitted)
end