Class: DispatchPolicy::Gates::FairInterleave

Inherits:
DispatchPolicy::Gate show all
Defined in:
lib/dispatch_policy/gates/fair_interleave.rb

Instance Attribute Summary

Attributes inherited from DispatchPolicy::Gate

#name, #partition_by, #policy

Instance Method Summary collapse

Methods inherited from DispatchPolicy::Gate

#initialize, #partition_key_for, register, registry, #tracks_inflight?

Constructor Details

This class inherits a constructor from DispatchPolicy::Gate

Instance Method Details

#configure(**_) ⇒ Object



6
# File 'lib/dispatch_policy/gates/fair_interleave.rb', line 6

def configure(**_); end

#filter(batch, context) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/dispatch_policy/gates/fair_interleave.rb', line 8

def filter(batch, context)
  groups = batch.group_by do |staged|
    if @partition_by
      partition_key_for(context.for(staged))
    else
      context.primary_partition_for(staged) || staged.id
    end
  end
  interleaved = []
  loop do
    taken = false
    groups.each_value do |g|
      next if g.empty?
      interleaved << g.shift
      taken = true
    end
    break unless taken
  end
  interleaved
end