Class: Polyrun::Partition::Plan
- Inherits:
-
Object
- Object
- Polyrun::Partition::Plan
- Defined in:
- lib/polyrun/partition/plan.rb,
lib/polyrun/partition/plan_sharding.rb
Overview
Assigns discrete items (e.g. spec paths) to shards (spec_queue.md).
Strategies:
-
round_robin— sorted paths, assign by index modtotal_shards. -
random_round_robin— Fisher–Yates shuffle (optionalseed), then same mod assignment. -
cost_binpack(cost,binpack,timing) — LPT greedy binpack using per-path weights; optional Constraints for pins / serial globs before LPT on the rest. -
hrw(rendezvous) — rendezvous hashing for minimal remapping when m changes; optional constraints.
Constant Summary collapse
- COST_STRATEGIES =
%w[cost cost_binpack binpack timing].freeze
- HRW_STRATEGIES =
%w[hrw rendezvous].freeze
Instance Attribute Summary collapse
-
#constraints ⇒ Object
readonly
Returns the value of attribute constraints.
-
#items ⇒ Object
readonly
Returns the value of attribute items.
-
#seed ⇒ Object
readonly
Returns the value of attribute seed.
-
#strategy ⇒ Object
readonly
Returns the value of attribute strategy.
-
#total_shards ⇒ Object
readonly
Returns the value of attribute total_shards.
Class Method Summary collapse
Instance Method Summary collapse
- #hrw_salt ⇒ Object
- #hrw_shards ⇒ Object
-
#initialize(items:, total_shards:, strategy: "round_robin", seed: nil, costs: nil, constraints: nil, root: nil) ⇒ Plan
constructor
A new instance of Plan.
- #manifest(shard_index) ⇒ Object
-
#mod_shards ⇒ Object
One pass over
ordered_items(round_robin / random_round_robin); avoids O(workers × n) rescans inshard. - #ordered_items ⇒ Object
- #random_seed ⇒ Object
- #shard(shard_index) ⇒ Object
- #shard_weight_totals ⇒ Object
Constructor Details
#initialize(items:, total_shards:, strategy: "round_robin", seed: nil, costs: nil, constraints: nil, root: nil) ⇒ Plan
Returns a new instance of Plan.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/polyrun/partition/plan.rb', line 24 def initialize(items:, total_shards:, strategy: "round_robin", seed: nil, costs: nil, constraints: nil, root: nil) @items = items.map(&:to_s).freeze @total_shards = Integer(total_shards) raise Polyrun::Error, "total_shards must be >= 1" if @total_shards < 1 @strategy = strategy.to_s @seed = seed @root = root ? File.(root) : Dir.pwd @constraints = normalize_constraints(constraints) @costs = normalize_costs(costs) validate_constraints_strategy_combo! if cost_strategy? && (@costs.nil? || @costs.empty?) raise Polyrun::Error, "strategy #{@strategy} requires a timing map (path => seconds), e.g. merged polyrun_timing.json" end end |
Instance Attribute Details
#constraints ⇒ Object (readonly)
Returns the value of attribute constraints.
22 23 24 |
# File 'lib/polyrun/partition/plan.rb', line 22 def constraints @constraints end |
#items ⇒ Object (readonly)
Returns the value of attribute items.
22 23 24 |
# File 'lib/polyrun/partition/plan.rb', line 22 def items @items end |
#seed ⇒ Object (readonly)
Returns the value of attribute seed.
22 23 24 |
# File 'lib/polyrun/partition/plan.rb', line 22 def seed @seed end |
#strategy ⇒ Object (readonly)
Returns the value of attribute strategy.
22 23 24 |
# File 'lib/polyrun/partition/plan.rb', line 22 def strategy @strategy end |
#total_shards ⇒ Object (readonly)
Returns the value of attribute total_shards.
22 23 24 |
# File 'lib/polyrun/partition/plan.rb', line 22 def total_shards @total_shards end |
Class Method Details
.cost_strategy?(name) ⇒ Boolean
108 109 110 |
# File 'lib/polyrun/partition/plan.rb', line 108 def self.cost_strategy?(name) COST_STRATEGIES.include?(name.to_s) end |
.hrw_strategy?(name) ⇒ Boolean
112 113 114 |
# File 'lib/polyrun/partition/plan.rb', line 112 def self.hrw_strategy?(name) HRW_STRATEGIES.include?(name.to_s) end |
.load_timing_costs(path) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/polyrun/partition/plan.rb', line 93 def self.load_timing_costs(path) abs = File.(path.to_s, Dir.pwd) return {} unless File.file?(abs) data = JSON.parse(File.read(abs)) return {} unless data.is_a?(Hash) out = {} data.each do |k, v| key = File.(k.to_s, Dir.pwd) out[key] = v.to_f end out end |
Instance Method Details
#hrw_salt ⇒ Object
35 36 37 38 |
# File 'lib/polyrun/partition/plan_sharding.rb', line 35 def hrw_salt s = seed (s.nil? || s.to_s.empty?) ? "polyrun-hrw" : s.to_s end |
#hrw_shards ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/polyrun/partition/plan_sharding.rb', line 6 def hrw_shards @hrw_shards ||= begin buckets = Array.new(total_shards) { [] } salt = hrw_salt items.each do |path| j = if @constraints && (fj = @constraints.forced_shard_for(path)) Integer(fj) else Hrw.shard_for(path: path, total_shards: total_shards, seed: salt) end raise Polyrun::Error, "constraint shard out of range" if j < 0 || j >= total_shards buckets[j] << path end buckets end end |
#manifest(shard_index) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/polyrun/partition/plan.rb', line 80 def manifest(shard_index) m = { "shard_index" => Integer(shard_index), "shard_total" => total_shards, "strategy" => strategy, "seed" => seed, "paths" => shard(shard_index) } secs = shard_weight_totals m["shard_seconds"] = secs if cost_strategy? || (hrw_strategy? && secs.any? { |x| x > 0 }) m end |
#mod_shards ⇒ Object
One pass over ordered_items (round_robin / random_round_robin); avoids O(workers × n) rescans in shard.
26 27 28 29 30 31 32 33 |
# File 'lib/polyrun/partition/plan_sharding.rb', line 26 def mod_shards @mod_shards ||= begin list = ordered_items buckets = Array.new(total_shards) { [] } list.each_with_index { |path, i| buckets[i % total_shards] << path } buckets end end |
#ordered_items ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/polyrun/partition/plan.rb', line 42 def ordered_items @ordered_items ||= case strategy when "round_robin" items.sort when "random_round_robin" StableShuffle.call(items.sort, random_seed) when "cost", "cost_binpack", "binpack", "timing" items.sort when "hrw", "rendezvous" items.sort else raise Polyrun::Error, "unknown partition strategy: #{strategy}" end end |
#random_seed ⇒ Object
40 41 42 43 44 45 |
# File 'lib/polyrun/partition/plan_sharding.rb', line 40 def random_seed s = seed return Integer(s) if s && s != "" 0 end |
#shard(shard_index) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/polyrun/partition/plan.rb', line 57 def shard(shard_index) idx = Integer(shard_index) raise Polyrun::Error, "shard_index out of range" if idx < 0 || idx >= total_shards if cost_strategy? cost_shards[idx] elsif hrw_strategy? hrw_shards[idx] else mod_shards[idx] end end |
#shard_weight_totals ⇒ Object
70 71 72 73 74 75 76 77 78 |
# File 'lib/polyrun/partition/plan.rb', line 70 def shard_weight_totals if cost_strategy? cost_shards.map { |paths| paths.sum { |p| weight_for(p) } } elsif hrw_strategy? hrw_shards.map { |paths| paths.sum { |p| weight_for_optional(p) } } else [] end end |