Class: Polyrun::Partition::Plan

Inherits:
Object
  • Object
show all
Defined in:
lib/polyrun/partition/plan.rb,
lib/polyrun/partition/plan_sharding.rb

Overview

Assigns discrete items (e.g. spec paths, or path:line example locators) to shards (spec_queue.md).

Strategies:

  • round_robin — sorted paths, assign by index mod total_shards.

  • random_round_robin — Fisher–Yates shuffle (optional seed), then same mod assignment.

  • cost_binpack (cost, binpack, timing) — LPT greedy binpack using per-item weights; optional Constraints for pins / serial globs before LPT on the rest. Default timing_granularity is file (one weight per spec file). Experimental :example uses path:line locators and per-example weights in the timing JSON.

  • hrw (rendezvous) — rendezvous hashing for minimal remapping when m changes; optional constraints.

Constant Summary collapse

COST_STRATEGIES =
%w[cost cost_binpack binpack timing].freeze
HRW_STRATEGIES =
%w[hrw rendezvous].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(items:, total_shards:, strategy: "round_robin", seed: nil, costs: nil, constraints: nil, root: nil, timing_granularity: :file) ⇒ Plan

Returns a new instance of Plan.

Raises:



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/polyrun/partition/plan.rb', line 24

def initialize(items:, total_shards:, strategy: "round_robin", seed: nil, costs: nil, constraints: nil, root: nil, timing_granularity: :file)
  @timing_granularity = TimingKeys.normalize_granularity(timing_granularity)
  @root = root ? File.expand_path(root) : Dir.pwd
  @items = items.map do |x|
    if @timing_granularity == :example
      TimingKeys.normalize_locator(x, @root, :example)
    else
      x.to_s.strip
    end
  end.freeze
  @total_shards = Integer(total_shards)
  raise Polyrun::Error, "total_shards must be >= 1" if @total_shards < 1

  @strategy = strategy.to_s
  @seed = seed
  @constraints = normalize_constraints(constraints)
  @costs = normalize_costs(costs)

  validate_constraints_strategy_combo!
  if cost_strategy? && (@costs.nil? || @costs.empty?)
    raise Polyrun::Error,
      "strategy #{@strategy} requires a timing map (path => seconds or path:line => seconds), e.g. merged polyrun_timing.json"
  end
end

Instance Attribute Details

#constraintsObject (readonly)

Returns the value of attribute constraints.



22
23
24
# File 'lib/polyrun/partition/plan.rb', line 22

def constraints
  @constraints
end

#itemsObject (readonly)

Returns the value of attribute items.



22
23
24
# File 'lib/polyrun/partition/plan.rb', line 22

def items
  @items
end

#seedObject (readonly)

Returns the value of attribute seed.



22
23
24
# File 'lib/polyrun/partition/plan.rb', line 22

def seed
  @seed
end

#strategyObject (readonly)

Returns the value of attribute strategy.



22
23
24
# File 'lib/polyrun/partition/plan.rb', line 22

def strategy
  @strategy
end

#timing_granularityObject (readonly)

Returns the value of attribute timing_granularity.



22
23
24
# File 'lib/polyrun/partition/plan.rb', line 22

def timing_granularity
  @timing_granularity
end

#total_shardsObject (readonly)

Returns the value of attribute total_shards.



22
23
24
# File 'lib/polyrun/partition/plan.rb', line 22

def total_shards
  @total_shards
end

Class Method Details

.cost_strategy?(name) ⇒ Boolean

Returns:

  • (Boolean)


105
106
107
# File 'lib/polyrun/partition/plan.rb', line 105

def self.cost_strategy?(name)
  COST_STRATEGIES.include?(name.to_s)
end

.hrw_strategy?(name) ⇒ Boolean

Returns:

  • (Boolean)


109
110
111
# File 'lib/polyrun/partition/plan.rb', line 109

def self.hrw_strategy?(name)
  HRW_STRATEGIES.include?(name.to_s)
end

.load_timing_costs(path, granularity: :file, root: nil) ⇒ Object



101
102
103
# File 'lib/polyrun/partition/plan.rb', line 101

def self.load_timing_costs(path, granularity: :file, root: nil)
  TimingKeys.load_costs_json_file(path, granularity, root: root)
end

Instance Method Details

#hrw_saltObject



35
36
37
38
# File 'lib/polyrun/partition/plan_sharding.rb', line 35

def hrw_salt
  s = seed
  (s.nil? || s.to_s.empty?) ? "polyrun-hrw" : s.to_s
end

#hrw_shardsObject



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/polyrun/partition/plan_sharding.rb', line 6

def hrw_shards
  @hrw_shards ||= begin
    buckets = Array.new(total_shards) { [] }
    salt = hrw_salt
    items.each do |path|
      j =
        if @constraints && (fj = @constraints.forced_shard_for(path))
          Integer(fj)
        else
          Hrw.shard_for(path: path, total_shards: total_shards, seed: salt)
        end
      raise Polyrun::Error, "constraint shard out of range" if j < 0 || j >= total_shards

      buckets[j] << path
    end
    buckets
  end
end

#manifest(shard_index) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/polyrun/partition/plan.rb', line 87

def manifest(shard_index)
  m = {
    "shard_index" => Integer(shard_index),
    "shard_total" => total_shards,
    "strategy" => strategy,
    "seed" => seed,
    "paths" => shard(shard_index)
  }
  m["timing_granularity"] = timing_granularity.to_s if timing_granularity == :example
  secs = shard_weight_totals
  m["shard_seconds"] = secs if cost_strategy? || (hrw_strategy? && secs.any? { |x| x > 0 })
  m
end

#mod_shardsObject

One pass over ordered_items (round_robin / random_round_robin); avoids O(workers × n) rescans in shard.



26
27
28
29
30
31
32
33
# File 'lib/polyrun/partition/plan_sharding.rb', line 26

def mod_shards
  @mod_shards ||= begin
    list = ordered_items
    buckets = Array.new(total_shards) { [] }
    list.each_with_index { |path, i| buckets[i % total_shards] << path }
    buckets
  end
end

#ordered_itemsObject



49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/polyrun/partition/plan.rb', line 49

def ordered_items
  @ordered_items ||= case strategy
  when "round_robin"
    items.sort
  when "random_round_robin"
    StableShuffle.call(items.sort, random_seed)
  when "cost", "cost_binpack", "binpack", "timing"
    items.sort
  when "hrw", "rendezvous"
    items.sort
  else
    raise Polyrun::Error, "unknown partition strategy: #{strategy}"
  end
end

#random_seedObject



40
41
42
43
44
45
# File 'lib/polyrun/partition/plan_sharding.rb', line 40

def random_seed
  s = seed
  return Integer(s) if s && s != ""

  0
end

#shard(shard_index) ⇒ Object

Raises:



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/polyrun/partition/plan.rb', line 64

def shard(shard_index)
  idx = Integer(shard_index)
  raise Polyrun::Error, "shard_index out of range" if idx < 0 || idx >= total_shards

  if cost_strategy?
    cost_shards[idx]
  elsif hrw_strategy?
    hrw_shards[idx]
  else
    mod_shards[idx]
  end
end

#shard_weight_totalsObject



77
78
79
80
81
82
83
84
85
# File 'lib/polyrun/partition/plan.rb', line 77

def shard_weight_totals
  if cost_strategy?
    cost_shards.map { |paths| paths.sum { |p| weight_for(p) } }
  elsif hrw_strategy?
    hrw_shards.map { |paths| paths.sum { |p| weight_for_optional(p) } }
  else
    []
  end
end