Class: Exwiw::MongodbParallelDumper

Inherits:
Object
  • Object
show all
Defined in:
lib/exwiw/mongodb_parallel_dumper.rb

Overview

Runs the inter-collection fork schedule from docs/mongodb-dump-parallelism-2x-notes.md, producing output byte-identical to the serial Runner while parallelizing the dominant cost (the Mongo driver’s BSON->Ruby decode) across processes — each worker decodes its own collections in their natural order, so order is preserved and the result still matches a serial dump.

It consumes the static, config-derived classification from MongodbParallelPlan (the three groups + cascade adjacency + ref_bt components) and adds the live orchestration the plan deliberately leaves out: a fork pool per group, LPT bin-packing on a per-collection cost weight, @state Marshal sidecar IPC for the handful of referenced leaves, and the Phase-2 cascade reprocess.

The schedule (one parent process + a pool of ‘workers` forks):

Phase 1 (concurrent): fork the leaf pool; the parent meanwhile dumps the
  schema and processes the WHOLE genuine DAG optimistically (no leaf @state
  yet), recording each genuine collection's row count.
Barrier: wait for the leaf pool; load the Marshal sidecars the consumed
  leaves wrote into the parent's @state.
Phase 2 (cascade): reprocess only the genuine collections whose output can
  change now that leaf @state is present (the direct-leaf referencers),
  cascading to genuine children of any whose row count actually changed.
Phase 3: fork the ref_bt collections as dependency-closed components, each
  worker owning whole components (processed in topological order) seeded with
  the leaf @state its members reference.

Output bytes are independent of the schedule: every collection writes its own insert-NNN-<name>.<ext> file (the index taken over the plan’s full ordering, exactly as the serial Runner numbers them) and the per-collection write is the same build_query -> execute -> write_inserts pass the Runner performs. The bin-packing only decides which worker runs which collection, never the bytes.

fork is required; callers must check MongodbParallelDumper.available? and fall back to the serial Runner on JRuby/TruffleRuby/Windows.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_config:, plan:, dump_target:, table_by_name:, output_dir:, workers:, logger:, weight_for: nil) ⇒ MongodbParallelDumper

Returns a new instance of MongodbParallelDumper.

Parameters:

  • connection_config (ConnectionConfig)

    used to build a FRESH adapter in the parent and in every fork (a Mongo client cannot be shared across fork)

  • plan (MongodbParallelPlan)

    the static classification for this dump

  • dump_target (DumpTarget)
  • table_by_name (Hash{String=>config})

    ALL configs (embedded included), exactly as Runner builds it

  • output_dir (String)
  • workers (Integer)

    fork pool size (>= 1)

  • logger (Logger)
  • weight_for (#call, nil) (defaults to: nil)

    optional name -> numeric cost weight for LPT; defaults to the adapter’s metadata-only estimated document count

Raises:

  • (ArgumentError)


80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/exwiw/mongodb_parallel_dumper.rb', line 80

def initialize(connection_config:, plan:, dump_target:, table_by_name:, output_dir:, workers:, logger:, weight_for: nil)
  raise ArgumentError, "workers must be >= 1 (got #{workers})" if workers < 1

  @connection_config = connection_config
  @plan = plan
  @dump_target = dump_target
  @table_by_name = table_by_name
  @output_dir = output_dir
  @workers = workers
  @logger = logger
  @weight_for = weight_for
end

Class Method Details

.available?Boolean

True when the runtime can ‘fork` (CRuby on a POSIX OS). On JRuby/TruffleRuby and Windows it cannot — the caller must run the serial Runner instead.

Returns:

  • (Boolean)


46
47
48
# File 'lib/exwiw/mongodb_parallel_dumper.rb', line 46

def self.available?
  Process.respond_to?(:fork)
end

.bin_pack(items, bins, &weight) ⇒ Object

Longest-Processing-Time bin-packing: assign ‘items` to `bins` bins, heaviest first onto the currently least-loaded bin. Returns an Array of `bins` arrays (some may be empty when items < bins). `weight` is called exactly once per item (it may be DB-backed, so it must not be invoked repeatedly). Pure — no DB, no IO — so it is unit-tested directly.

Raises:

  • (ArgumentError)


55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/exwiw/mongodb_parallel_dumper.rb', line 55

def self.bin_pack(items, bins, &weight)
  raise ArgumentError, "bins must be >= 1 (got #{bins})" if bins < 1

  weighted = items.map { |item| [item, weight.call(item)] }.sort_by { |(_, w)| -w }
  groups = Array.new(bins) { [] }
  loads = Array.new(bins, 0)
  weighted.each do |(item, w)|
    i = (0...bins).min_by { |j| loads[j] }
    groups[i] << item
    loads[i] += w
  end
  groups
end

Instance Method Details

#runObject

Execute the full schedule. Assumes the caller has already cleaned the output directory (the Runner does this before handing off), mirroring the serial path which dumps the schema into a freshly-cleaned dir. Returns a small stats Hash. Raises if any worker pool reports a non-zero exit.



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/exwiw/mongodb_parallel_dumper.rb', line 97

def run
  raise "fork is unavailable on this runtime; run the serial Runner instead" unless self.class.available?

  FileUtils.mkdir_p(@output_dir)
  parent = build_adapter

  Dir.mktmpdir("exwiw-mongo-parallel-") do |sidecar_dir|
    phase1_leaf_and_genuine(parent, sidecar_dir)
    phase2_cascade(parent, sidecar_dir)
    phase3_ref_components(parent, sidecar_dir)
  end

  {
    workers: @workers,
    genuine: @plan.genuine.size,
    leaves: @plan.leaves.size,
    ref_bt: @plan.ref_bt.size,
    components: @plan.reference_components.map(&:size).sort.reverse,
  }
end