Module: RSpecTracer::RSpec::ParallelTests Private

Defined in:
lib/rspec_tracer/rspec/parallel_tests.rb

Overview

This module is part of a private API. You should avoid using this module if possible, as it may be removed or be changed in the future.

parallel_tests orchestration for the v2 engine.

1.x scattered the parallel-worker glue across ‘lib/rspec_tracer.rb` (`parallel_tests_setup`, `track_parallel_tests_test_env_number`, `run_parallel_tests_exit_tasks`, `merge_parallel_tests_reports`, `parallel_tests_last_process?`, etc). 2.0 collapses them here and rewires the snapshot merge onto `Storage::JsonBackend#merge_from_peers` so any storage backend (including SQLite) gets the merge for free.

Responsibilities:

- Detect `TEST_ENV_NUMBER` + `PARALLEL_TEST_GROUPS` env vars.
- Maintain the shared `rspec_tracer.lock` file that records the
  highest TEST_ENV_NUMBER seen (last-process detection).
- Decide the narrator: first process by env convention. Log
  rollup lines only on the narrator unless
  `RSPEC_TRACER_VERBOSE=true`.
- On the last process, merge per-worker snapshots +
  coverage.json, purge `parallel_tests_N/` directories.

Graceful degradation: every merge / cleanup step rescues StandardError and logs - a partial or corrupt peer cache must never propagate a non-zero exit into the user’s test run.

Constant Summary collapse

LOCK_ENCODING =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Internal constant.

'UTF-8'
BOOT_MARKER_FILENAME =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Per-worker boot/done breadcrumbs written to each worker’s ‘parallel_tests_N/` cache dir. The elected worker uses these at finalize time to verify every booted peer has reached the end of its at_exit before merge + purge:

.boot — written at setup! time (very early, before any
        cache write). Source-of-truth for "this worker
        ever booted past RSpecTracer.start".
.done — written at finalize entry, AFTER per-worker
        run_finalize + emit_coverage_json. Must be the
        last write our code does into parallel_tests_N/
        on the worker side - the elected reads its
        presence as "this peer is fully flushed".

Verification path: see ‘wait_for_peer_done_markers!`. Without this the elected trusted only `wait_for_other_processes_to_finish`’s pid-file barrier, which observed evidence on GHA Linux x86_64 showed could return before a sibling had flushed - leaving a straggler ‘parallel_tests_N/` after purge (failing spec/integration/parallel_tests_spec.rb:88 intermittently).

'.rspec_tracer_boot'
DONE_MARKER_FILENAME =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Internal constant.

'.rspec_tracer_done'
PEER_DONE_DEADLINE_SECONDS =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Bound on the elected worker’s wait for missing .done markers. 5s comfortably exceeds the at_exit tail of any well-behaved peer; on timeout we log + proceed (graceful degradation: a truly-crashed peer must not pin the elected forever).

5

Class Method Summary collapse

Class Method Details

.active?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Internal helper for the tracer pipeline.

Returns:

  • (Boolean)


70
71
72
73
74
75
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 70

def self.active?
  return false if ::ENV.fetch('TEST_ENV_NUMBER', nil).nil?
  return false if ::ENV.fetch('PARALLEL_TEST_GROUPS', nil).nil?

  true
end

.build_merged_run_metadata(base_dir) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Internal helper for the tracer pipeline.



297
298
299
300
301
302
303
304
305
306
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 297

def self.(base_dir)
  {
    pid: Process.pid,
    run_time: nil,
    started_at: nil,
    cache_path: base_dir,
    parallel_tests: true,
    rails: RSpecTracer.rails?
  }
end

.emit_merged_reporters!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Emit reporters against the merged top-level snapshot so the user gets one terminal summary + one JSON report + one HTML report at the canonical (non-‘parallel_tests_N`) path. Wrapped in its own rescue so a failed reporter never blocks purge / lock cleanup downstream.



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 261

def self.emit_merged_reporters!
  return unless RSpecTracer.storage_backend == :json

  base_dir = ::File.dirname(RSpecTracer.cache_path)
  merged_snapshot = load_merged_snapshot(base_dir)
  return if merged_snapshot.nil?

  top_report_dir = ::File.dirname(RSpecTracer.report_path)
  ::FileUtils.mkdir_p(top_report_dir)

  RSpecTracer::Reporters::Registry.emit_all(
    configuration: RSpecTracer,
    snapshot: merged_snapshot,
    report_dir: top_report_dir,
    run_metadata: (base_dir)
  )
rescue StandardError => e
  RSpecTracer.logger.warn(
    "RSpec tracer: merged reporter emission failed (#{e.class}: #{e.message})"
  )
end

.finalize!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called from at_exit after the per-worker snapshot has been persisted. Every worker drops its ‘.done` marker as the very first step here so the elected worker’s verification loop can observe it; non-elected workers then return. The elected worker waits for every booted peer’s ‘.done` to appear, orchestrates the snapshot + coverage merge, emits the merged reporters, and purges per-worker dirs.

‘touch_done!` MUST stay the last write our code performs into `parallel_tests_N/` — anything written later would land after the elected has decided it’s safe to purge, leaving stragglers.



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 151

def self.finalize!
  return false unless active?

  touch_done!

  return false unless last_process?

  ::ParallelTests.wait_for_other_processes_to_finish if defined?(::ParallelTests)

  # Belt-and-suspenders barrier: pid-file said everyone's done,
  # but observed CI evidence (GHA Linux x86_64, Ruby 3.4 cells)
  # caught a sibling's `parallel_tests_N/` reappearing post-purge
  # — i.e., the pid signal returned while a peer hadn't fully
  # flushed yet. Cross-check via the .boot/.done filesystem
  # markers before declaring the peer set stable.
  wait_for_peer_done_markers!

  merge_snapshot!
  merge_coverage! unless RSpecTracer.simplecov?
  # Emit terminal/JSON/HTML reporters ONCE at the merged top-level
  # location BEFORE purge_worker_dirs! removes the per-worker
  # `parallel_tests_N` dirs. Earlier behavior had each worker emit
  # reports into its `rspec_tracer_report/parallel_tests_N` dir
  # and the purge then deleted them, leaving the user with zero
  # usable output. Now reporters consume the just-merged
  # top-level snapshot.
  emit_merged_reporters!
  purge_worker_dirs!
  remove_lock_file!
  true
rescue StandardError => e
  RSpecTracer.logger.warn(
    "RSpec tracer: parallel_tests finalize failed (#{e.class}: #{e.message})"
  )
  false
end

.last_process?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Elects the worker that performs the per-run merge. Delegates to ‘::ParallelTests.first_process?`, which returns true iff `TEST_ENV_NUMBER.to_i <= 1` – i.e. for exactly one worker (TEST_ENV_NUMBER == ” or ’1’), regardless of how many workers were actually spawned vs. how many CPUs the runner reports.

Two historical approaches do NOT work here:

1. The 1.x lock-file scheme (each worker wrote its
   TEST_ENV_NUMBER to `rspec_tracer.lock` at RSpecTracer.start
   time; last_process? picked the max) deadlocked under CI:
   worker 1 could finish its examples before worker 2 even
   loaded spec_helper, observe itself as the max, and enter
   `wait_for_other_processes_to_finish` concurrently with
   worker 2's own self-election -- both workers spun on each
   other's pid.

2. `::ParallelTests.last_process?` compares TEST_ENV_NUMBER
   against PARALLEL_TEST_GROUPS. parallel_rspec's CLI sets
   PARALLEL_TEST_GROUPS to the CPU-based *intended* process
   count, NOT the actual worker count -- so when fewer specs
   than CPUs are present, no TEST_ENV_NUMBER ever matches
   PARALLEL_TEST_GROUPS and the merge is silently skipped.

‘first_process?` avoids both: it is immutable across worker lifetime (set by the parent at spawn) and identifies exactly one worker regardless of CPU count. The elected worker still calls `wait_for_other_processes_to_finish` before merging, so peer caches are guaranteed on disk by merge time.

Returns:

  • (Boolean)


352
353
354
355
356
357
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 352

def self.last_process?
  return false unless active?
  return false unless defined?(::ParallelTests)

  ::ParallelTests.first_process?
end

.load_merged_snapshot(base_dir) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Internal helper for the tracer pipeline.



285
286
287
288
289
290
291
292
293
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 285

def self.load_merged_snapshot(base_dir)
  backend = RSpecTracer::Storage::JsonBackend.new(
    cache_path: base_dir,
    logger: RSpecTracer.logger,
    retention_local_count: RSpecTracer.cache_retention_local_count,
    serializer: RSpecTracer.storage_backend_opts[:serializer] || :json
  )
  backend.load_graph(schema_version: RSpecTracer::Storage::Schema::CURRENT)
end

.log_rollups?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

True iff this worker should emit rollup log lines. Per-example RSpec output (dots, failures, durations) is unaffected - that’s RSpec’s own Reporter, not this module.

Returns:

  • (Boolean)


97
98
99
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 97

def self.log_rollups?
  verbose? || narrator?
end

.merge_coverage!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Merge per-worker coverage.json files into a top-level coverage.json. Routed through Reporters::CoverageJsonReporter.merge_parallel (replaces the legacy CoverageMerger + CoverageWriter pair).



425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 425

def self.merge_coverage!
  base_dir = ::File.dirname(RSpecTracer.coverage_path)
  peer_paths = peer_paths_for(base_dir)
  return if peer_paths.empty?

  starting = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  RSpecTracer::Reporters::CoverageJsonReporter.merge_parallel(
    peer_paths: peer_paths,
    output_path: ::File.join(base_dir, 'coverage.json'),
    logger: RSpecTracer.logger
  )

  ending = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  elapsed = RSpecTracer::TimeFormatter.format_time(ending - starting)

  RSpecTracer.logger.debug("RSpec tracer merged parallel tests coverage (took #{elapsed})") if log_rollups?
end

.merge_snapshot!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Merge the per-worker v2 snapshots into the top-level cache. SqliteBackend has no merge surface (single-file, latest-run only); the elected worker persists its own run via Engine finalize and the per-worker files accumulate next to it untouched. The JSON merge path stays authoritative for the default ‘:json` backend which is what parallel_tests fixtures exercise in CI.



398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 398

def self.merge_snapshot!
  return unless RSpecTracer.storage_backend == :json

  base_dir = ::File.dirname(RSpecTracer.cache_path)
  peer_paths = peer_paths_for(base_dir)
  return if peer_paths.empty?

  starting = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  top = RSpecTracer::Storage::JsonBackend.new(
    cache_path: base_dir, logger: RSpecTracer.logger,
    retention_local_count: RSpecTracer.cache_retention_local_count,
    warn_per_file_mb: RSpecTracer.cache_size_warn_per_file_mb,
    warn_total_mb: RSpecTracer.cache_size_warn_total_mb,
    serializer: RSpecTracer.storage_backend_opts[:serializer] || :json
  )
  top.merge_from_peers(peer_paths, schema_version: RSpecTracer::Storage::Schema::CURRENT)

  ending = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  elapsed = RSpecTracer::TimeFormatter.format_time(ending - starting)

  RSpecTracer.logger.debug("RSpec tracer merged parallel tests reports (took #{elapsed})") if log_rollups?
end

.narrator?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Narrator = first process. TEST_ENV_NUMBER is either ” or ‘1’ for the first worker under parallel_tests; otherwise ‘2’, ‘3’, etc. When the gem is not running under parallel_tests, the single process is its own narrator.

Returns:

  • (Boolean)


81
82
83
84
85
86
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 81

def self.narrator?
  return true unless active?

  value = ::ENV.fetch('TEST_ENV_NUMBER', '').to_s
  value.empty? || value == '1'
end

.peer_dirs_missing_done(base_dir) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Set difference of ‘.boot`-bearing peer dirs and `.done`-bearing peer dirs under `base_dir`. A returned entry means “this peer registered but hasn’t signaled completion yet” — either still mid-flush, or crashed.



243
244
245
246
247
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 243

def self.peer_dirs_missing_done(base_dir)
  boot_dirs = peer_dirs_with_marker(base_dir, BOOT_MARKER_FILENAME)
  done_dirs = peer_dirs_with_marker(base_dir, DONE_MARKER_FILENAME)
  boot_dirs - done_dirs
end

.peer_dirs_with_marker(base_dir, marker_filename) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Internal helper for the tracer pipeline.



251
252
253
254
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 251

def self.peer_dirs_with_marker(base_dir, marker_filename)
  paths = ::Dir.glob(::File.join(base_dir, 'parallel_tests_*', marker_filename))
  paths.map { |path| ::File.dirname(path) }
end

.peer_paths_for(base_dir) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

parallel_tests sets ‘PARALLEL_TEST_GROUPS = num_processes.to_s` for each child, where `num_processes` is the user-requested process count (Parallel.processor_count by default) - NOT the number of workers actually spawned. When `num_processes` and the spawned-worker count diverge (e.g. when the spec count caps the partition below the CPU count, or when shared-runner cgroup throttling shifts the visible CPU count between when the parent computed `num_processes` and the spec count is observed), iterating `1..ENV` either over-iterates (cheap; rm_rf on a non-existent path is a no-op) or UNDER-iterates (expensive; merge skips peers + purge leaves `parallel_tests_N` stragglers behind, breaking the integration spec at `spec/integration/parallel_tests_spec.rb:88`).

Glob the actual filesystem state rather than reconstructing dir names from an env var with surprising semantics. The directory IS the source of truth for which workers ran. The wait at ‘finalize!` (`wait_for_other_processes_to_finish`) guarantees every other worker’s at_exit has flushed its ‘parallel_tests_N` tree before this method runs, so the glob captures every peer.



385
386
387
388
389
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 385

def self.peer_paths_for(base_dir)
  ::Dir.glob(::File.join(base_dir, 'parallel_tests_*')).select do |path|
    ::File.directory?(path)
  end
end

.purge_worker_dirs!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sweep every ‘parallel_tests_*` subdirectory under each managed base path. Globbing matches the same source-of-truth contract documented on `peer_paths_for`: the directories that actually exist are exactly the workers that ran, regardless of what PARALLEL_TEST_GROUPS reports.



449
450
451
452
453
454
455
456
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 449

def self.purge_worker_dirs!
  [RSpecTracer.cache_path, RSpecTracer.coverage_path, RSpecTracer.report_path].each do |path|
    base_dir = ::File.dirname(path)
    ::Dir.glob(::File.join(base_dir, 'parallel_tests_*')).each do |worker_dir|
      ::FileUtils.rm_rf(worker_dir)
    end
  end
end

.remove_lock_file!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Internal helper for the tracer pipeline.



361
362
363
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 361

def self.remove_lock_file!
  ::FileUtils.rm_f(RSpecTracer.lock_file)
end

.setup!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called from RSpecTracer.start when parallel_tests is active. Writes this worker’s TEST_ENV_NUMBER into the shared lock file under an exclusive lock so the max-seen value ends up correct regardless of worker boot order, then drops the .boot breadcrumb so the elected worker can enumerate “every peer that booted” at finalize time.



107
108
109
110
111
112
113
114
115
116
117
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 107

def self.setup!
  return false unless active?

  require 'parallel_tests' unless defined?(::ParallelTests)
  track_test_env_number!
  touch_boot!
  true
rescue LoadError => e
  RSpecTracer.logger.error("Failed to load parallel_tests gem (#{e.class}: #{e.message})")
  false
end

.touch_boot!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Write ‘parallel_tests_N/.rspec_tracer_boot` with this worker’s pid + TEST_ENV_NUMBER + timestamp. Source-of-truth for “this worker booted past RSpecTracer.start”, consumed by the elected worker’s finalize-time peer enumeration. Idempotent: a re-run of setup! overwrites with current values.



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 124

def self.touch_boot!
  ::FileUtils.mkdir_p(RSpecTracer.cache_path)
  ::File.write(
    ::File.join(RSpecTracer.cache_path, BOOT_MARKER_FILENAME),
    ::JSON.generate(
      pid: ::Process.pid,
      test_env_number: ::ENV.fetch('TEST_ENV_NUMBER', ''),
      started_at: ::Time.now.utc.iso8601
    )
  )
rescue StandardError => e
  RSpecTracer.logger.warn(
    "RSpec tracer: failed to write boot marker (#{e.class}: #{e.message})"
  )
end

.touch_done!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Drop ‘parallel_tests_N/.rspec_tracer_done` as a flush-complete signal for the elected worker’s verification loop. The cache dir already exists by this point (run_finalize mkdir_p’s it earlier in the at_exit chain); the explicit mkdir_p here is belt-and-suspenders for the no-examples / early-return paths. Graceful-degradation rescue keeps a marker-write failure from propagating into the user’s exit status.



195
196
197
198
199
200
201
202
203
204
205
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 195

def self.touch_done!
  ::FileUtils.mkdir_p(RSpecTracer.cache_path)
  ::File.write(
    ::File.join(RSpecTracer.cache_path, DONE_MARKER_FILENAME),
    ::Time.now.utc.iso8601
  )
rescue StandardError => e
  RSpecTracer.logger.warn(
    "RSpec tracer: failed to write done marker (#{e.class}: #{e.message})"
  )
end

.track_test_env_number!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Internal helper for the tracer pipeline.



310
311
312
313
314
315
316
317
318
319
320
321
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 310

def self.track_test_env_number!
  ::File.open(RSpecTracer.lock_file, ::File::RDWR | ::File::CREAT, 0o644) do |f|
    f.flock(::File::LOCK_EX)

    test_num = [f.read.to_i, ::ENV.fetch('TEST_ENV_NUMBER').to_i].max

    f.rewind
    f.write("#{test_num}\n")
    f.flush
    f.truncate(f.pos)
  end
end

.verbose?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Internal helper for the tracer pipeline.

Returns:

  • (Boolean)


90
91
92
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 90

def self.verbose?
  ::ENV.fetch('RSPEC_TRACER_VERBOSE', nil) == 'true'
end

.wait_for_peer_done_markers!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Block until every peer that wrote ‘.boot` has also written `.done`, or the deadline elapses. Polled at 50ms — fine enough that the typical “barrier returned a tick early” case closes within one or two polls, coarse enough not to dominate CPU.

On timeout we log a warn and proceed: a peer that never wrote ‘.done` either crashed (then its dir is orphan content; the subsequent `purge_worker_dirs!` cleans it) or is genuinely hung (the elected can’t fix that — we choose merge correctness over indefinite wait).



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/rspec_tracer/rspec/parallel_tests.rb', line 218

def self.wait_for_peer_done_markers!
  base_dir = ::File.dirname(RSpecTracer.cache_path)
  deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + PEER_DONE_DEADLINE_SECONDS

  loop do
    missing = peer_dirs_missing_done(base_dir)
    return if missing.empty?

    if ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) >= deadline
      RSpecTracer.logger.warn(
        'RSpec tracer: peers booted without finishing within ' \
        "#{PEER_DONE_DEADLINE_SECONDS}s: #{missing.inspect}; " \
        'proceeding (peer dirs will be purged regardless of completion state)'
      )
      return
    end

    sleep 0.05
  end
end