Class: DispatchPolicy::Policy

Inherits:
Object
  • Object
show all
Defined in:
lib/dispatch_policy/policy.rb

Constant Summary collapse

DEFAULT_SHARD =
"default"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, context_proc:, gates:, retry_strategy: :restage, queue_name: nil, admission_batch_size: nil, shard_by_proc: nil, partition_by_proc: nil, fairness_half_life_seconds: nil, tick_admission_budget: nil) ⇒ Policy

Returns a new instance of Policy.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/dispatch_policy/policy.rb', line 11

def initialize(name:, context_proc:, gates:, retry_strategy: :restage,
               queue_name: nil, admission_batch_size: nil, shard_by_proc: nil,
               partition_by_proc: nil,
               fairness_half_life_seconds: nil, tick_admission_budget: nil)
  @name                 = name.to_s
  @context_proc         = context_proc
  @gates                = gates.freeze
  @retry_strategy       = retry_strategy
  @queue_name           = queue_name
  @admission_batch_size = admission_batch_size
  @shard_by_proc        = shard_by_proc
  @partition_by_proc    = partition_by_proc
  @fairness_half_life_seconds = fairness_half_life_seconds
  @tick_admission_budget = tick_admission_budget

  validate!
end

Instance Attribute Details

#admission_batch_sizeObject (readonly)

Returns the value of attribute admission_batch_size.



7
8
9
# File 'lib/dispatch_policy/policy.rb', line 7

def admission_batch_size
  @admission_batch_size
end

#context_procObject (readonly)

Returns the value of attribute context_proc.



7
8
9
# File 'lib/dispatch_policy/policy.rb', line 7

def context_proc
  @context_proc
end

#fairness_half_life_secondsObject (readonly)

Returns the value of attribute fairness_half_life_seconds.



7
8
9
# File 'lib/dispatch_policy/policy.rb', line 7

def fairness_half_life_seconds
  @fairness_half_life_seconds
end

#gatesObject (readonly)

Returns the value of attribute gates.



7
8
9
# File 'lib/dispatch_policy/policy.rb', line 7

def gates
  @gates
end

#nameObject (readonly)

Returns the value of attribute name.



7
8
9
# File 'lib/dispatch_policy/policy.rb', line 7

def name
  @name
end

#partition_by_procObject (readonly)

Returns the value of attribute partition_by_proc.



7
8
9
# File 'lib/dispatch_policy/policy.rb', line 7

def partition_by_proc
  @partition_by_proc
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



7
8
9
# File 'lib/dispatch_policy/policy.rb', line 7

def queue_name
  @queue_name
end

#retry_strategyObject (readonly)

Returns the value of attribute retry_strategy.



7
8
9
# File 'lib/dispatch_policy/policy.rb', line 7

def retry_strategy
  @retry_strategy
end

#shard_by_procObject (readonly)

Returns the value of attribute shard_by_proc.



7
8
9
# File 'lib/dispatch_policy/policy.rb', line 7

def shard_by_proc
  @shard_by_proc
end

#tick_admission_budgetObject (readonly)

Returns the value of attribute tick_admission_budget.



7
8
9
# File 'lib/dispatch_policy/policy.rb', line 7

def tick_admission_budget
  @tick_admission_budget
end

Instance Method Details

#build_context(arguments, queue_name: nil) ⇒ Object

Builds the Context the gates and shard_by will see at admission time. The user’s context_proc receives the job’s arguments. The gem then enriches the resulting hash with ‘queue_name` (the ActiveJob queue) so shard_by/partition_by can route by queue without the user having to thread it through their proc.



34
35
36
37
38
39
# File 'lib/dispatch_policy/policy.rb', line 34

def build_context(arguments, queue_name: nil)
  base = context_proc ? context_proc.call(arguments) : {}
  base = (base || {}).to_h
  base = base.merge(queue_name: queue_name) if queue_name
  Context.wrap(base)
end

#bypass_retries?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/dispatch_policy/policy.rb', line 69

def bypass_retries?
  retry_strategy == :bypass
end

#partition_for(ctx) ⇒ Object

Policy-level partition scope. Both the staged_jobs row and the concurrency gate’s inflight_jobs row use this single canonical value as their partition_key, so all gates enforce their state at exactly the same scope. Required: validate! raises if the policy is built without one.



50
51
52
53
# File 'lib/dispatch_policy/policy.rb', line 50

def partition_for(ctx)
  value = @partition_by_proc.call(ctx)
  value.nil? ? "" : value.to_s
end

#partition_key_for(ctx) ⇒ Object



41
42
43
# File 'lib/dispatch_policy/policy.rb', line 41

def partition_key_for(ctx)
  partition_for(ctx)
end

#restage_retries?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/dispatch_policy/policy.rb', line 65

def restage_retries?
  retry_strategy == :restage
end

#shard_for(ctx) ⇒ Object

The shard a partition belongs to. Stable per (policy, partition_key) via first-writer-wins in Repository.upsert_partition!. If no shard_by is declared the partition lives on the “default” shard.



58
59
60
61
62
63
# File 'lib/dispatch_policy/policy.rb', line 58

def shard_for(ctx)
  return DEFAULT_SHARD unless @shard_by_proc

  value = @shard_by_proc.call(ctx)
  value.nil? ? DEFAULT_SHARD : value.to_s
end