Class: DispatchPolicy::Gates::Concurrency

Inherits:
DispatchPolicy::Gate show all
Defined in:
lib/dispatch_policy/gates/concurrency.rb

Instance Attribute Summary

Attributes inherited from DispatchPolicy::Gate

#name, #partition_by, #policy

Instance Method Summary collapse

Methods inherited from DispatchPolicy::Gate

#initialize, #partition_key_for, register, registry

Constructor Details

This class inherits a constructor from DispatchPolicy::Gate

Instance Method Details

#configure(max:) ⇒ Object



6
7
8
# File 'lib/dispatch_policy/gates/concurrency.rb', line 6

def configure(max:)
  @max = max
end

#filter(batch, context) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/dispatch_policy/gates/concurrency.rb', line 14

def filter(batch, context)
  by_partition = batch.group_by { |staged| partition_key_for(context.for(staged)) }

  in_flight = PartitionInflightCount.fetch_many(
    policy_name:    policy.name,
    gate_name:      name,
    partition_keys: by_partition.keys
  )

  admitted = []
  by_partition.each do |partition_key, jobs|
    jobs.each do |staged|
      ctx   = context.for(staged)
      limit = resolve(@max, ctx).to_i
      used  = in_flight.fetch(partition_key, 0)
      if used < limit
        admitted << [ staged, partition_key ]
        in_flight[partition_key] = used + 1
      end
    end
  end

  context.record_partitions(admitted, gate: name)
  admitted.map(&:first)
end

#tracks_inflight?Boolean

Returns:

  • (Boolean)


10
11
12
# File 'lib/dispatch_policy/gates/concurrency.rb', line 10

def tracks_inflight?
  true
end