Class: DispatchPolicy::PolicyDSL
- Inherits:
-
Object
- Object
- DispatchPolicy::PolicyDSL
- Defined in:
- lib/dispatch_policy/policy_dsl.rb
Constant Summary collapse
- GATE_TYPES =
{ throttle: Gates::Throttle, concurrency: Gates::Concurrency, adaptive_concurrency: Gates::AdaptiveConcurrency }.freeze
Class Method Summary collapse
Instance Method Summary collapse
- #admission_batch_size(size) ⇒ Object
- #context(callable = nil, &block) ⇒ Object
-
#fairness(half_life: nil) ⇒ Object
Per-policy override for the EWMA half-life used to weigh recent admissions when reordering claimed partitions inside the tick.
- #gate(type, **options) ⇒ Object
-
#initialize(name) ⇒ PolicyDSL
constructor
A new instance of PolicyDSL.
-
#partition_by(callable = nil, &block) ⇒ Object
Defines the partition scope.
- #queue_name(name) ⇒ Object
- #retry_strategy(strategy) ⇒ Object
-
#shard_by(callable = nil, &block) ⇒ Object
Routes a partition to a specific shard.
-
#tick_admission_budget(value) ⇒ Object
Per-policy override for the global tick admission cap.
- #to_policy ⇒ Object
Constructor Details
#initialize(name) ⇒ PolicyDSL
Returns a new instance of PolicyDSL.
17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/dispatch_policy/policy_dsl.rb', line 17 def initialize(name) @name = name @context_proc = nil @gates = [] @retry_strategy = :restage @queue_name = nil @admission_batch_size = nil @shard_by_proc = nil @partition_by_proc = nil @fairness_half_life_seconds = nil @tick_admission_budget = nil end |
Class Method Details
.build(name, &block) ⇒ Object
11 12 13 14 15 |
# File 'lib/dispatch_policy/policy_dsl.rb', line 11 def self.build(name, &block) dsl = new(name) dsl.instance_eval(&block) if block dsl.to_policy end |
Instance Method Details
#admission_batch_size(size) ⇒ Object
47 48 49 |
# File 'lib/dispatch_policy/policy_dsl.rb', line 47 def admission_batch_size(size) @admission_batch_size = Integer(size) end |
#context(callable = nil, &block) ⇒ Object
30 31 32 |
# File 'lib/dispatch_policy/policy_dsl.rb', line 30 def context(callable = nil, &block) @context_proc = callable || block end |
#fairness(half_life: nil) ⇒ Object
Per-policy override for the EWMA half-life used to weigh recent admissions when reordering claimed partitions inside the tick. Accepts a Numeric (seconds) or any object responding to ‘to_f` (so ActiveSupport durations like `30.seconds` work too).
fairness half_life: 30.seconds
56 57 58 |
# File 'lib/dispatch_policy/policy_dsl.rb', line 56 def fairness(half_life: nil) @fairness_half_life_seconds = Float(half_life) if half_life end |
#gate(type, **options) ⇒ Object
34 35 36 37 |
# File 'lib/dispatch_policy/policy_dsl.rb', line 34 def gate(type, **) klass = GATE_TYPES[type] || raise(UnknownGate, "unknown gate type: #{type.inspect}") @gates << klass.new(**) end |
#partition_by(callable = nil, &block) ⇒ Object
Defines the partition scope. Required — every policy declares exactly one. Every gate in the policy uses this proc to compute the scope it enforces against (the staged_jobs row, the throttle bucket on that row, and the concurrency gate’s inflight rows all share the same canonical key).
dispatch_policy :endpoints do
partition_by ->(ctx) { ctx[:endpoint_id] }
gate :throttle, rate: 60, per: 60
gate :concurrency, max: 5
end
If you need different scopes per gate (e.g. throttle by endpoint AND concurrency by account), use two policies and let one chain into the other.
83 84 85 |
# File 'lib/dispatch_policy/policy_dsl.rb', line 83 def partition_by(callable = nil, &block) @partition_by_proc = callable || block end |
#queue_name(name) ⇒ Object
43 44 45 |
# File 'lib/dispatch_policy/policy_dsl.rb', line 43 def queue_name(name) @queue_name = name.to_s end |
#retry_strategy(strategy) ⇒ Object
39 40 41 |
# File 'lib/dispatch_policy/policy_dsl.rb', line 39 def retry_strategy(strategy) @retry_strategy = strategy end |
#shard_by(callable = nil, &block) ⇒ Object
Routes a partition to a specific shard. The proc receives the enriched Context (which includes :queue_name from the job) and returns a string. Tick loops can be scoped per-shard so multiple workers can process a single policy in parallel.
shard_by ->(ctx) { ctx[:queue_name] } # shard = job's queue
shard_by ->(ctx) { "shard-#{ctx[:account_id].hash % 4}" } # explicit hash
IMPORTANT: shard_by must be CONSISTENT with the gate’s ‘partition_by` of any rate/concurrency budget you want to enforce globally. A throttle gate’s bucket lives on the partition row, so if two staged_partitions sharing the same throttle key end up on different shards, each shard runs its own bucket and the effective rate becomes rate × N_shards.
101 102 103 |
# File 'lib/dispatch_policy/policy_dsl.rb', line 101 def shard_by(callable = nil, &block) @shard_by_proc = callable || block end |
#tick_admission_budget(value) ⇒ Object
Per-policy override for the global tick admission cap. nil (default) means use config.tick_admission_budget; if that’s also nil, no global cap is enforced and per-partition admission_batch_size is the only ceiling.
64 65 66 |
# File 'lib/dispatch_policy/policy_dsl.rb', line 64 def tick_admission_budget(value) @tick_admission_budget = Integer(value) end |
#to_policy ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/dispatch_policy/policy_dsl.rb', line 105 def to_policy Policy.new( name: @name, context_proc: @context_proc, gates: @gates, retry_strategy: @retry_strategy, queue_name: @queue_name, admission_batch_size: @admission_batch_size, shard_by_proc: @shard_by_proc, partition_by_proc: @partition_by_proc, fairness_half_life_seconds: @fairness_half_life_seconds, tick_admission_budget: @tick_admission_budget ) end |