Class: DispatchPolicy::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/dispatch_policy/pipeline.rb

Overview

Composes a sequence of gates into a single admission decision for one partition. Returns a value object describing how many jobs may be admitted right now and which gate-state patches to persist.

Defined Under Namespace

Classes: Result

Instance Method Summary collapse

Constructor Details

#initialize(policy) ⇒ Pipeline

Returns a new instance of Pipeline.



10
11
12
# File 'lib/dispatch_policy/pipeline.rb', line 10

def initialize(policy)
  @policy = policy
end

Instance Method Details

#call(ctx, partition, max_budget) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/dispatch_policy/pipeline.rb', line 14

def call(ctx, partition, max_budget)
  budget          = max_budget
  retry_after     = nil
  patch           = {}
  reasons         = []
  decisions       = []

  @policy.gates.each do |gate|
    decision = gate.evaluate(ctx, partition, budget)
    decisions << [gate, decision]
    budget   = decision.allowed.finite? ? [budget, decision.allowed].min : budget
    if decision.retry_after
      retry_after = retry_after.nil? ? decision.retry_after : [retry_after, decision.retry_after].min
    end
    reasons << "#{gate.name}:#{decision.reason}" if decision.reason
    break if budget.zero?
  end

  admit_count = budget.finite? ? budget : max_budget
  admit_count = 0 if admit_count.negative?

  decisions.each do |_, decision|
    next unless decision.gate_state_patch
    patch.merge!(decision.gate_state_patch)
  end

  Result.new(
    admit_count:       admit_count,
    retry_after:       retry_after,
    gate_state_patch:  patch,
    reasons:           reasons
  )
end