Class: DispatchPolicy::Policy
- Inherits:
-
Object
- Object
- DispatchPolicy::Policy
- Defined in:
- lib/dispatch_policy/policy.rb
Constant Summary collapse
- DEFAULT_SHARD =
"default"
Instance Attribute Summary collapse
-
#admission_batch_size ⇒ Object
readonly
Returns the value of attribute admission_batch_size.
-
#context_proc ⇒ Object
readonly
Returns the value of attribute context_proc.
-
#fairness_half_life_seconds ⇒ Object
readonly
Returns the value of attribute fairness_half_life_seconds.
-
#gates ⇒ Object
readonly
Returns the value of attribute gates.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#partition_by_proc ⇒ Object
readonly
Returns the value of attribute partition_by_proc.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
#retry_strategy ⇒ Object
readonly
Returns the value of attribute retry_strategy.
-
#shard_by_proc ⇒ Object
readonly
Returns the value of attribute shard_by_proc.
-
#tick_admission_budget ⇒ Object
readonly
Returns the value of attribute tick_admission_budget.
Instance Method Summary collapse
-
#build_context(arguments, queue_name: nil) ⇒ Object
Builds the Context the gates and shard_by will see at admission time.
- #bypass_retries? ⇒ Boolean
-
#initialize(name:, context_proc:, gates:, retry_strategy: :restage, queue_name: nil, admission_batch_size: nil, shard_by_proc: nil, partition_by_proc: nil, fairness_half_life_seconds: nil, tick_admission_budget: nil) ⇒ Policy
constructor
A new instance of Policy.
-
#partition_for(ctx) ⇒ Object
Policy-level partition scope.
- #partition_key_for(ctx) ⇒ Object
- #restage_retries? ⇒ Boolean
-
#shard_for(ctx) ⇒ Object
The shard a partition belongs to.
Constructor Details
#initialize(name:, context_proc:, gates:, retry_strategy: :restage, queue_name: nil, admission_batch_size: nil, shard_by_proc: nil, partition_by_proc: nil, fairness_half_life_seconds: nil, tick_admission_budget: nil) ⇒ Policy
Returns a new instance of Policy.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/dispatch_policy/policy.rb', line 11 def initialize(name:, context_proc:, gates:, retry_strategy: :restage, queue_name: nil, admission_batch_size: nil, shard_by_proc: nil, partition_by_proc: nil, fairness_half_life_seconds: nil, tick_admission_budget: nil) @name = name.to_s @context_proc = context_proc @gates = gates.freeze @retry_strategy = retry_strategy @queue_name = queue_name @admission_batch_size = admission_batch_size @shard_by_proc = shard_by_proc @partition_by_proc = partition_by_proc @fairness_half_life_seconds = fairness_half_life_seconds @tick_admission_budget = tick_admission_budget validate! end |
Instance Attribute Details
#admission_batch_size ⇒ Object (readonly)
Returns the value of attribute admission_batch_size.
7 8 9 |
# File 'lib/dispatch_policy/policy.rb', line 7 def admission_batch_size @admission_batch_size end |
#context_proc ⇒ Object (readonly)
Returns the value of attribute context_proc.
7 8 9 |
# File 'lib/dispatch_policy/policy.rb', line 7 def context_proc @context_proc end |
#fairness_half_life_seconds ⇒ Object (readonly)
Returns the value of attribute fairness_half_life_seconds.
7 8 9 |
# File 'lib/dispatch_policy/policy.rb', line 7 def fairness_half_life_seconds @fairness_half_life_seconds end |
#gates ⇒ Object (readonly)
Returns the value of attribute gates.
7 8 9 |
# File 'lib/dispatch_policy/policy.rb', line 7 def gates @gates end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
7 8 9 |
# File 'lib/dispatch_policy/policy.rb', line 7 def name @name end |
#partition_by_proc ⇒ Object (readonly)
Returns the value of attribute partition_by_proc.
7 8 9 |
# File 'lib/dispatch_policy/policy.rb', line 7 def partition_by_proc @partition_by_proc end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
7 8 9 |
# File 'lib/dispatch_policy/policy.rb', line 7 def queue_name @queue_name end |
#retry_strategy ⇒ Object (readonly)
Returns the value of attribute retry_strategy.
7 8 9 |
# File 'lib/dispatch_policy/policy.rb', line 7 def retry_strategy @retry_strategy end |
#shard_by_proc ⇒ Object (readonly)
Returns the value of attribute shard_by_proc.
7 8 9 |
# File 'lib/dispatch_policy/policy.rb', line 7 def shard_by_proc @shard_by_proc end |
#tick_admission_budget ⇒ Object (readonly)
Returns the value of attribute tick_admission_budget.
7 8 9 |
# File 'lib/dispatch_policy/policy.rb', line 7 def tick_admission_budget @tick_admission_budget end |
Instance Method Details
#build_context(arguments, queue_name: nil) ⇒ Object
Builds the Context the gates and shard_by will see at admission time. The user’s context_proc receives the job’s arguments. The gem then enriches the resulting hash with ‘queue_name` (the ActiveJob queue) so shard_by/partition_by can route by queue without the user having to thread it through their proc.
34 35 36 37 38 39 |
# File 'lib/dispatch_policy/policy.rb', line 34 def build_context(arguments, queue_name: nil) base = context_proc ? context_proc.call(arguments) : {} base = (base || {}).to_h base = base.merge(queue_name: queue_name) if queue_name Context.wrap(base) end |
#bypass_retries? ⇒ Boolean
69 70 71 |
# File 'lib/dispatch_policy/policy.rb', line 69 def bypass_retries? retry_strategy == :bypass end |
#partition_for(ctx) ⇒ Object
Policy-level partition scope. Both the staged_jobs row and the concurrency gate’s inflight_jobs row use this single canonical value as their partition_key, so all gates enforce their state at exactly the same scope. Required: validate! raises if the policy is built without one.
50 51 52 53 |
# File 'lib/dispatch_policy/policy.rb', line 50 def partition_for(ctx) value = @partition_by_proc.call(ctx) value.nil? ? "" : value.to_s end |
#partition_key_for(ctx) ⇒ Object
41 42 43 |
# File 'lib/dispatch_policy/policy.rb', line 41 def partition_key_for(ctx) partition_for(ctx) end |
#restage_retries? ⇒ Boolean
65 66 67 |
# File 'lib/dispatch_policy/policy.rb', line 65 def restage_retries? retry_strategy == :restage end |
#shard_for(ctx) ⇒ Object
The shard a partition belongs to. Stable per (policy, partition_key) via first-writer-wins in Repository.upsert_partition!. If no shard_by is declared the partition lives on the “default” shard.
58 59 60 61 62 63 |
# File 'lib/dispatch_policy/policy.rb', line 58 def shard_for(ctx) return DEFAULT_SHARD unless @shard_by_proc value = @shard_by_proc.call(ctx) value.nil? ? DEFAULT_SHARD : value.to_s end |