Class: DispatchPolicy::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/dispatch_policy/pipeline.rb

Overview

Composes a sequence of gates into a single admission decision for one partition. Returns a value object describing how many jobs may be admitted right now and which gate-state patches to persist.

Defined Under Namespace

Classes: Result

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(policy) ⇒ Pipeline

Returns a new instance of Pipeline.



10
11
12
# File 'lib/dispatch_policy/pipeline.rb', line 10

def initialize(policy)
  @policy = policy
end

Class Method Details

.settle(decisions, admitted_count) ⇒ Object

Computes the gate_state patch to persist once the REAL admitted count is known (after the staging DELETE). Each gate’s #consume settles its state against the actual number of jobs claimed — the throttle deducts that many tokens rather than the optimistic ‘allowed` it returned at evaluate time. Gates that keep no gate_state (concurrency, adaptive_concurrency — their state lives in their own tables) return nil from #consume and contribute nothing here.

‘decisions` is the [gate, decision] list carried on the Result.



23
24
25
26
27
28
29
30
# File 'lib/dispatch_policy/pipeline.rb', line 23

def self.settle(decisions, admitted_count)
  patch = {}
  decisions.each do |gate, decision|
    sub = gate.consume(decision, admitted_count)
    patch.merge!(sub) if sub
  end
  patch
end

Instance Method Details

#call(ctx, partition, max_budget) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/dispatch_policy/pipeline.rb', line 32

def call(ctx, partition, max_budget)
  budget          = max_budget
  retry_after     = nil
  patch           = {}
  reasons         = []
  decisions       = []

  @policy.gates.each do |gate|
    decision = gate.evaluate(ctx, partition, budget)
    decisions << [gate, decision]
    budget   = decision.allowed.finite? ? [budget, decision.allowed].min : budget
    if decision.retry_after
      retry_after = retry_after.nil? ? decision.retry_after : [retry_after, decision.retry_after].min
    end
    reasons << "#{gate.name}:#{decision.reason}" if decision.reason
    break if budget.zero?
  end

  admit_count = budget.finite? ? budget : max_budget
  admit_count = 0 if admit_count.negative?

  decisions.each do |_, decision|
    next unless decision.gate_state_patch
    patch.merge!(decision.gate_state_patch)
  end

  Result.new(
    admit_count:       admit_count,
    retry_after:       retry_after,
    gate_state_patch:  patch,
    reasons:           reasons,
    decisions:         decisions
  )
end