Class: DispatchPolicy::Gates::Concurrency
- Inherits:
-
DispatchPolicy::Gate
- Object
- DispatchPolicy::Gate
- DispatchPolicy::Gates::Concurrency
- Defined in:
- lib/dispatch_policy/gates/concurrency.rb
Overview
Concurrency gate: caps in-flight jobs per partition.
The partition scope is the policy’s ‘partition_by`. Inflight rows are written by InflightTracker around_perform with the same key, so this gate’s COUNT(*) aggregates the same canonical scope as the staged_jobs row.
Constant Summary collapse
- DEFAULT_FULL_BACKOFF =
seconds
1.0
Instance Attribute Summary collapse
-
#full_backoff ⇒ Object
readonly
Returns the value of attribute full_backoff.
-
#max_proc ⇒ Object
readonly
Returns the value of attribute max_proc.
Instance Method Summary collapse
- #evaluate(ctx, partition, admit_budget) ⇒ Object
-
#inflight_partition_key(policy_name, ctx) ⇒ Object
The inflight key is always the policy’s canonical partition value — same as what’s stored in staged_jobs.partition_key.
-
#initialize(max:, full_backoff: DEFAULT_FULL_BACKOFF) ⇒ Concurrency
constructor
A new instance of Concurrency.
- #name ⇒ Object
Methods inherited from DispatchPolicy::Gate
Constructor Details
#initialize(max:, full_backoff: DEFAULT_FULL_BACKOFF) ⇒ Concurrency
Returns a new instance of Concurrency.
16 17 18 19 20 |
# File 'lib/dispatch_policy/gates/concurrency.rb', line 16 def initialize(max:, full_backoff: DEFAULT_FULL_BACKOFF) super() @max_proc = max.respond_to?(:call) ? max : ->(_ctx) { max } @full_backoff = full_backoff.to_f end |
Instance Attribute Details
#full_backoff ⇒ Object (readonly)
Returns the value of attribute full_backoff.
14 15 16 |
# File 'lib/dispatch_policy/gates/concurrency.rb', line 14 def full_backoff @full_backoff end |
#max_proc ⇒ Object (readonly)
Returns the value of attribute max_proc.
14 15 16 |
# File 'lib/dispatch_policy/gates/concurrency.rb', line 14 def max_proc @max_proc end |
Instance Method Details
#evaluate(ctx, partition, admit_budget) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/dispatch_policy/gates/concurrency.rb', line 26 def evaluate(ctx, partition, admit_budget) cap = capacity_for(ctx) return Decision.deny(retry_after: @full_backoff, reason: "max=0") if cap <= 0 in_flight = Repository.count_inflight( policy_name: partition["policy_name"], partition_key: inflight_partition_key(partition["policy_name"], ctx) ) remaining = cap - in_flight if remaining <= 0 # Stop hammering this partition with COUNT(*) every tick — back off # until enough jobs are likely to have finished. return Decision.new(allowed: 0, retry_after: @full_backoff, reason: "concurrency_full") end Decision.new(allowed: [remaining, admit_budget].min) end |
#inflight_partition_key(policy_name, ctx) ⇒ Object
The inflight key is always the policy’s canonical partition value — same as what’s stored in staged_jobs.partition_key. This is what makes throttle + concurrency in the same policy enforce their state at exactly one consistent scope.
48 49 50 51 52 |
# File 'lib/dispatch_policy/gates/concurrency.rb', line 48 def inflight_partition_key(policy_name, ctx) policy = DispatchPolicy.registry.fetch(policy_name) raise InvalidPolicy, "unknown policy #{policy_name.inspect}" unless policy policy.partition_for(ctx) end |
#name ⇒ Object
22 23 24 |
# File 'lib/dispatch_policy/gates/concurrency.rb', line 22 def name :concurrency end |