Class: DispatchPolicy::Gates::Concurrency
Instance Attribute Summary
#name, #partition_by, #policy
Instance Method Summary
collapse
#initialize, #partition_key_for, register, registry
Instance Method Details
6
7
8
|
# File 'lib/dispatch_policy/gates/concurrency.rb', line 6
def configure(max:)
@max = max
end
|
#filter(batch, context) ⇒ Object
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
# File 'lib/dispatch_policy/gates/concurrency.rb', line 14
def filter(batch, context)
by_partition = batch.group_by { |staged| partition_key_for(context.for(staged)) }
in_flight = PartitionInflightCount.fetch_many(
policy_name: policy.name,
gate_name: name,
partition_keys: by_partition.keys
)
admitted = []
by_partition.each do |partition_key, jobs|
jobs.each do |staged|
ctx = context.for(staged)
limit = resolve(@max, ctx).to_i
used = in_flight.fetch(partition_key, 0)
if used < limit
admitted << [ staged, partition_key ]
in_flight[partition_key] = used + 1
end
end
end
context.record_partitions(admitted, gate: name)
admitted.map(&:first)
end
|
#tracks_inflight? ⇒ Boolean
10
11
12
|
# File 'lib/dispatch_policy/gates/concurrency.rb', line 10
def tracks_inflight?
true
end
|