Class: Polyrun::Partition::Plan

Inherits:
Object
  • Object
show all
Defined in:
lib/polyrun/partition/plan.rb,
lib/polyrun/partition/plan_sharding.rb

Overview

Assigns discrete items (e.g. spec paths) to shards (spec_queue.md).

Strategies:

  • round_robin — sorted paths, assign by index mod total_shards.

  • random_round_robin — Fisher–Yates shuffle (optional seed), then same mod assignment.

  • cost_binpack (cost, binpack, timing) — LPT greedy binpack using per-path weights; optional Constraints for pins / serial globs before LPT on the rest.

  • hrw (rendezvous) — rendezvous hashing for minimal remapping when m changes; optional constraints.

Constant Summary collapse

COST_STRATEGIES =
%w[cost cost_binpack binpack timing].freeze
HRW_STRATEGIES =
%w[hrw rendezvous].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(items:, total_shards:, strategy: "round_robin", seed: nil, costs: nil, constraints: nil, root: nil) ⇒ Plan

Returns a new instance of Plan.

Raises:



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/polyrun/partition/plan.rb', line 24

def initialize(items:, total_shards:, strategy: "round_robin", seed: nil, costs: nil, constraints: nil, root: nil)
  @items = items.map(&:to_s).freeze
  @total_shards = Integer(total_shards)
  raise Polyrun::Error, "total_shards must be >= 1" if @total_shards < 1

  @strategy = strategy.to_s
  @seed = seed
  @root = root ? File.expand_path(root) : Dir.pwd
  @constraints = normalize_constraints(constraints)
  @costs = normalize_costs(costs)

  validate_constraints_strategy_combo!
  if cost_strategy? && (@costs.nil? || @costs.empty?)
    raise Polyrun::Error,
      "strategy #{@strategy} requires a timing map (path => seconds), e.g. merged polyrun_timing.json"
  end
end

Instance Attribute Details

#constraintsObject (readonly)

Returns the value of attribute constraints.



22
23
24
# File 'lib/polyrun/partition/plan.rb', line 22

def constraints
  @constraints
end

#itemsObject (readonly)

Returns the value of attribute items.



22
23
24
# File 'lib/polyrun/partition/plan.rb', line 22

def items
  @items
end

#seedObject (readonly)

Returns the value of attribute seed.



22
23
24
# File 'lib/polyrun/partition/plan.rb', line 22

def seed
  @seed
end

#strategyObject (readonly)

Returns the value of attribute strategy.



22
23
24
# File 'lib/polyrun/partition/plan.rb', line 22

def strategy
  @strategy
end

#total_shardsObject (readonly)

Returns the value of attribute total_shards.



22
23
24
# File 'lib/polyrun/partition/plan.rb', line 22

def total_shards
  @total_shards
end

Class Method Details

.cost_strategy?(name) ⇒ Boolean

Returns:

  • (Boolean)


108
109
110
# File 'lib/polyrun/partition/plan.rb', line 108

def self.cost_strategy?(name)
  COST_STRATEGIES.include?(name.to_s)
end

.hrw_strategy?(name) ⇒ Boolean

Returns:

  • (Boolean)


112
113
114
# File 'lib/polyrun/partition/plan.rb', line 112

def self.hrw_strategy?(name)
  HRW_STRATEGIES.include?(name.to_s)
end

.load_timing_costs(path) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/polyrun/partition/plan.rb', line 93

def self.load_timing_costs(path)
  abs = File.expand_path(path.to_s, Dir.pwd)
  return {} unless File.file?(abs)

  data = JSON.parse(File.read(abs))
  return {} unless data.is_a?(Hash)

  out = {}
  data.each do |k, v|
    key = File.expand_path(k.to_s, Dir.pwd)
    out[key] = v.to_f
  end
  out
end

Instance Method Details

#hrw_saltObject



35
36
37
38
# File 'lib/polyrun/partition/plan_sharding.rb', line 35

def hrw_salt
  s = seed
  (s.nil? || s.to_s.empty?) ? "polyrun-hrw" : s.to_s
end

#hrw_shardsObject



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/polyrun/partition/plan_sharding.rb', line 6

def hrw_shards
  @hrw_shards ||= begin
    buckets = Array.new(total_shards) { [] }
    salt = hrw_salt
    items.each do |path|
      j =
        if @constraints && (fj = @constraints.forced_shard_for(path))
          Integer(fj)
        else
          Hrw.shard_for(path: path, total_shards: total_shards, seed: salt)
        end
      raise Polyrun::Error, "constraint shard out of range" if j < 0 || j >= total_shards

      buckets[j] << path
    end
    buckets
  end
end

#manifest(shard_index) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/polyrun/partition/plan.rb', line 80

def manifest(shard_index)
  m = {
    "shard_index" => Integer(shard_index),
    "shard_total" => total_shards,
    "strategy" => strategy,
    "seed" => seed,
    "paths" => shard(shard_index)
  }
  secs = shard_weight_totals
  m["shard_seconds"] = secs if cost_strategy? || (hrw_strategy? && secs.any? { |x| x > 0 })
  m
end

#mod_shardsObject

One pass over ordered_items (round_robin / random_round_robin); avoids O(workers × n) rescans in shard.



26
27
28
29
30
31
32
33
# File 'lib/polyrun/partition/plan_sharding.rb', line 26

def mod_shards
  @mod_shards ||= begin
    list = ordered_items
    buckets = Array.new(total_shards) { [] }
    list.each_with_index { |path, i| buckets[i % total_shards] << path }
    buckets
  end
end

#ordered_itemsObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/polyrun/partition/plan.rb', line 42

def ordered_items
  @ordered_items ||= case strategy
  when "round_robin"
    items.sort
  when "random_round_robin"
    StableShuffle.call(items.sort, random_seed)
  when "cost", "cost_binpack", "binpack", "timing"
    items.sort
  when "hrw", "rendezvous"
    items.sort
  else
    raise Polyrun::Error, "unknown partition strategy: #{strategy}"
  end
end

#random_seedObject



40
41
42
43
44
45
# File 'lib/polyrun/partition/plan_sharding.rb', line 40

def random_seed
  s = seed
  return Integer(s) if s && s != ""

  0
end

#shard(shard_index) ⇒ Object

Raises:



57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/polyrun/partition/plan.rb', line 57

def shard(shard_index)
  idx = Integer(shard_index)
  raise Polyrun::Error, "shard_index out of range" if idx < 0 || idx >= total_shards

  if cost_strategy?
    cost_shards[idx]
  elsif hrw_strategy?
    hrw_shards[idx]
  else
    mod_shards[idx]
  end
end

#shard_weight_totalsObject



70
71
72
73
74
75
76
77
78
# File 'lib/polyrun/partition/plan.rb', line 70

def shard_weight_totals
  if cost_strategy?
    cost_shards.map { |paths| paths.sum { |p| weight_for(p) } }
  elsif hrw_strategy?
    hrw_shards.map { |paths| paths.sum { |p| weight_for_optional(p) } }
  else
    []
  end
end