Class: Exwiw::MongodbParallelDumper
- Inherits:
-
Object
- Object
- Exwiw::MongodbParallelDumper
- Defined in:
- lib/exwiw/mongodb_parallel_dumper.rb
Overview
Runs the inter-collection fork schedule from docs/mongodb-dump-parallelism-2x-notes.md, producing output byte-identical to the serial Runner while parallelizing the dominant cost (the Mongo driver’s BSON->Ruby decode) across processes — each worker decodes its own collections in their natural order, so order is preserved and the result still matches a serial dump.
It consumes the static, config-derived classification from MongodbParallelPlan (the three groups + cascade adjacency + ref_bt components) and adds the live orchestration the plan deliberately leaves out: a fork pool per group, LPT bin-packing on a per-collection cost weight, @state Marshal sidecar IPC for the handful of referenced leaves, and the Phase-2 cascade reprocess.
The schedule (one parent process + a pool of ‘workers` forks):
Phase 1 (concurrent): fork the leaf pool; the parent meanwhile dumps the
schema and processes the WHOLE genuine DAG optimistically (no leaf @state
yet), recording each genuine collection's row count.
Barrier: wait for the leaf pool; load the Marshal sidecars the consumed
leaves wrote into the parent's @state.
Phase 2 (cascade): reprocess only the genuine collections whose output can
change now that leaf @state is present (the direct-leaf referencers),
cascading to genuine children of any whose row count actually changed.
Phase 3: fork the ref_bt collections as dependency-closed components, each
worker owning whole components (processed in topological order) seeded with
the leaf @state its members reference.
Output bytes are independent of the schedule: every collection writes its own insert-NNN-<name>.<ext> file (the index taken over the plan’s full ordering, exactly as the serial Runner numbers them) and the per-collection write is the same build_query -> execute -> write_inserts pass the Runner performs. The bin-packing only decides which worker runs which collection, never the bytes.
fork is required; callers must check MongodbParallelDumper.available? and fall back to the serial Runner on JRuby/TruffleRuby/Windows.
Class Method Summary collapse
-
.available? ⇒ Boolean
True when the runtime can ‘fork` (CRuby on a POSIX OS).
-
.bin_pack(items, bins, &weight) ⇒ Object
Longest-Processing-Time bin-packing: assign ‘items` to `bins` bins, heaviest first onto the currently least-loaded bin.
Instance Method Summary collapse
-
#initialize(connection_config:, plan:, dump_target:, table_by_name:, output_dir:, workers:, logger:, weight_for: nil) ⇒ MongodbParallelDumper
constructor
A new instance of MongodbParallelDumper.
-
#run ⇒ Object
Execute the full schedule.
Constructor Details
#initialize(connection_config:, plan:, dump_target:, table_by_name:, output_dir:, workers:, logger:, weight_for: nil) ⇒ MongodbParallelDumper
Returns a new instance of MongodbParallelDumper.
80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/exwiw/mongodb_parallel_dumper.rb', line 80 def initialize(connection_config:, plan:, dump_target:, table_by_name:, output_dir:, workers:, logger:, weight_for: nil) raise ArgumentError, "workers must be >= 1 (got #{workers})" if workers < 1 @connection_config = connection_config @plan = plan @dump_target = dump_target @table_by_name = table_by_name @output_dir = output_dir @workers = workers @logger = logger @weight_for = weight_for end |
Class Method Details
.available? ⇒ Boolean
True when the runtime can ‘fork` (CRuby on a POSIX OS). On JRuby/TruffleRuby and Windows it cannot — the caller must run the serial Runner instead.
46 47 48 |
# File 'lib/exwiw/mongodb_parallel_dumper.rb', line 46 def self.available? Process.respond_to?(:fork) end |
.bin_pack(items, bins, &weight) ⇒ Object
Longest-Processing-Time bin-packing: assign ‘items` to `bins` bins, heaviest first onto the currently least-loaded bin. Returns an Array of `bins` arrays (some may be empty when items < bins). `weight` is called exactly once per item (it may be DB-backed, so it must not be invoked repeatedly). Pure — no DB, no IO — so it is unit-tested directly.
55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/exwiw/mongodb_parallel_dumper.rb', line 55 def self.bin_pack(items, bins, &weight) raise ArgumentError, "bins must be >= 1 (got #{bins})" if bins < 1 weighted = items.map { |item| [item, weight.call(item)] }.sort_by { |(_, w)| -w } groups = Array.new(bins) { [] } loads = Array.new(bins, 0) weighted.each do |(item, w)| i = (0...bins).min_by { |j| loads[j] } groups[i] << item loads[i] += w end groups end |
Instance Method Details
#run ⇒ Object
Execute the full schedule. Assumes the caller has already cleaned the output directory (the Runner does this before handing off), mirroring the serial path which dumps the schema into a freshly-cleaned dir. Returns a small stats Hash. Raises if any worker pool reports a non-zero exit.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/exwiw/mongodb_parallel_dumper.rb', line 97 def run raise "fork is unavailable on this runtime; run the serial Runner instead" unless self.class.available? FileUtils.mkdir_p(@output_dir) parent = build_adapter Dir.mktmpdir("exwiw-mongo-parallel-") do |sidecar_dir| phase1_leaf_and_genuine(parent, sidecar_dir) phase2_cascade(parent, sidecar_dir) phase3_ref_components(parent, sidecar_dir) end { workers: @workers, genuine: @plan.genuine.size, leaves: @plan.leaves.size, ref_bt: @plan.ref_bt.size, components: @plan.reference_components.map(&:size).sort.reverse, } end |