Class: DispatchPolicy::Gates::Concurrency

Inherits:
DispatchPolicy::Gate show all
Defined in:
lib/dispatch_policy/gates/concurrency.rb

Overview

Concurrency gate: caps in-flight jobs per partition.

The partition scope is the policy’s ‘partition_by`. Inflight rows are written by InflightTracker around_perform with the same key, so this gate’s COUNT(*) aggregates the same canonical scope as the staged_jobs row.

Constant Summary collapse

DEFAULT_FULL_BACKOFF =

seconds

1.0

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from DispatchPolicy::Gate

#consume

Constructor Details

#initialize(max:, full_backoff: DEFAULT_FULL_BACKOFF) ⇒ Concurrency

Returns a new instance of Concurrency.



16
17
18
19
20
# File 'lib/dispatch_policy/gates/concurrency.rb', line 16

def initialize(max:, full_backoff: DEFAULT_FULL_BACKOFF)
  super()
  @max_proc     = max.respond_to?(:call) ? max : ->(_ctx) { max }
  @full_backoff = full_backoff.to_f
end

Instance Attribute Details

#full_backoffObject (readonly)

Returns the value of attribute full_backoff.



14
15
16
# File 'lib/dispatch_policy/gates/concurrency.rb', line 14

def full_backoff
  @full_backoff
end

#max_procObject (readonly)

Returns the value of attribute max_proc.



14
15
16
# File 'lib/dispatch_policy/gates/concurrency.rb', line 14

def max_proc
  @max_proc
end

Instance Method Details

#evaluate(ctx, partition, admit_budget) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/dispatch_policy/gates/concurrency.rb', line 26

def evaluate(ctx, partition, admit_budget)
  cap = capacity_for(ctx)
  return Decision.deny(retry_after: @full_backoff, reason: "max=0") if cap <= 0

  in_flight = Repository.count_inflight(
    policy_name:   partition["policy_name"],
    partition_key: inflight_partition_key(partition["policy_name"], ctx)
  )
  remaining = cap - in_flight
  if remaining <= 0
    # Stop hammering this partition with COUNT(*) every tick — back off
    # until enough jobs are likely to have finished.
    return Decision.new(allowed: 0, retry_after: @full_backoff, reason: "concurrency_full")
  end

  Decision.new(allowed: [remaining, admit_budget].min)
end

#inflight_partition_key(policy_name, ctx) ⇒ Object

The inflight key is always the policy’s canonical partition value — same as what’s stored in staged_jobs.partition_key. This is what makes throttle + concurrency in the same policy enforce their state at exactly one consistent scope.

Raises:



48
49
50
51
52
# File 'lib/dispatch_policy/gates/concurrency.rb', line 48

def inflight_partition_key(policy_name, ctx)
  policy = DispatchPolicy.registry.fetch(policy_name)
  raise InvalidPolicy, "unknown policy #{policy_name.inspect}" unless policy
  policy.partition_for(ctx)
end

#nameObject



22
23
24
# File 'lib/dispatch_policy/gates/concurrency.rb', line 22

def name
  :concurrency
end