Class: DispatchPolicy::PolicyDSL

Inherits:
Object
  • Object
show all
Defined in:
lib/dispatch_policy/policy_dsl.rb

Constant Summary collapse

GATE_TYPES =
{
  throttle:             Gates::Throttle,
  concurrency:          Gates::Concurrency,
  adaptive_concurrency: Gates::AdaptiveConcurrency
}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ PolicyDSL

Returns a new instance of PolicyDSL.



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/dispatch_policy/policy_dsl.rb', line 17

def initialize(name)
  @name                 = name
  @context_proc         = nil
  @gates                = []
  @retry_strategy       = :restage
  @queue_name           = nil
  @admission_batch_size = nil
  @shard_by_proc        = nil
  @partition_by_proc    = nil
  @fairness_half_life_seconds = nil
  @tick_admission_budget = nil
end

Class Method Details

.build(name, &block) ⇒ Object



11
12
13
14
15
# File 'lib/dispatch_policy/policy_dsl.rb', line 11

def self.build(name, &block)
  dsl = new(name)
  dsl.instance_eval(&block) if block
  dsl.to_policy
end

Instance Method Details

#admission_batch_size(size) ⇒ Object



47
48
49
# File 'lib/dispatch_policy/policy_dsl.rb', line 47

def admission_batch_size(size)
  @admission_batch_size = Integer(size)
end

#context(callable = nil, &block) ⇒ Object



30
31
32
# File 'lib/dispatch_policy/policy_dsl.rb', line 30

def context(callable = nil, &block)
  @context_proc = callable || block
end

#fairness(half_life: nil) ⇒ Object

Per-policy override for the EWMA half-life used to weigh recent admissions when reordering claimed partitions inside the tick. Accepts a Numeric (seconds) or any object responding to ‘to_f` (so ActiveSupport durations like `30.seconds` work too).

fairness half_life: 30.seconds


56
57
58
# File 'lib/dispatch_policy/policy_dsl.rb', line 56

def fairness(half_life: nil)
  @fairness_half_life_seconds = Float(half_life) if half_life
end

#gate(type, **options) ⇒ Object



34
35
36
37
# File 'lib/dispatch_policy/policy_dsl.rb', line 34

def gate(type, **options)
  klass = GATE_TYPES[type] || raise(UnknownGate, "unknown gate type: #{type.inspect}")
  @gates << klass.new(**options)
end

#partition_by(callable = nil, &block) ⇒ Object

Defines the partition scope. Required — every policy declares exactly one. Every gate in the policy uses this proc to compute the scope it enforces against (the staged_jobs row, the throttle bucket on that row, and the concurrency gate’s inflight rows all share the same canonical key).

dispatch_policy :endpoints do
  partition_by ->(ctx) { ctx[:endpoint_id] }
  gate :throttle,    rate: 60, per: 60
  gate :concurrency, max: 5
end

If you need different scopes per gate (e.g. throttle by endpoint AND concurrency by account), use two policies and let one chain into the other.



83
84
85
# File 'lib/dispatch_policy/policy_dsl.rb', line 83

def partition_by(callable = nil, &block)
  @partition_by_proc = callable || block
end

#queue_name(name) ⇒ Object



43
44
45
# File 'lib/dispatch_policy/policy_dsl.rb', line 43

def queue_name(name)
  @queue_name = name.to_s
end

#retry_strategy(strategy) ⇒ Object



39
40
41
# File 'lib/dispatch_policy/policy_dsl.rb', line 39

def retry_strategy(strategy)
  @retry_strategy = strategy
end

#shard_by(callable = nil, &block) ⇒ Object

Routes a partition to a specific shard. The proc receives the enriched Context (which includes :queue_name from the job) and returns a string. Tick loops can be scoped per-shard so multiple workers can process a single policy in parallel.

shard_by ->(ctx) { ctx[:queue_name] }                   # shard = job's queue
shard_by ->(ctx) { "shard-#{ctx[:account_id].hash % 4}" } # explicit hash

IMPORTANT: shard_by must be CONSISTENT with the gate’s ‘partition_by` of any rate/concurrency budget you want to enforce globally. A throttle gate’s bucket lives on the partition row, so if two staged_partitions sharing the same throttle key end up on different shards, each shard runs its own bucket and the effective rate becomes rate × N_shards.



101
102
103
# File 'lib/dispatch_policy/policy_dsl.rb', line 101

def shard_by(callable = nil, &block)
  @shard_by_proc = callable || block
end

#tick_admission_budget(value) ⇒ Object

Per-policy override for the global tick admission cap. nil (default) means use config.tick_admission_budget; if that’s also nil, no global cap is enforced and per-partition admission_batch_size is the only ceiling.



64
65
66
# File 'lib/dispatch_policy/policy_dsl.rb', line 64

def tick_admission_budget(value)
  @tick_admission_budget = Integer(value)
end

#to_policyObject



105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/dispatch_policy/policy_dsl.rb', line 105

def to_policy
  Policy.new(
    name:                 @name,
    context_proc:         @context_proc,
    gates:                @gates,
    retry_strategy:       @retry_strategy,
    queue_name:           @queue_name,
    admission_batch_size: @admission_batch_size,
    shard_by_proc:        @shard_by_proc,
    partition_by_proc:    @partition_by_proc,
    fairness_half_life_seconds: @fairness_half_life_seconds,
    tick_admission_budget: @tick_admission_budget
  )
end