Class: ActiveSupport::Testing::Parallelization

Inherits:
Object
  • Object
show all
Defined in:
lib/active_support/testing/parallelization.rb

Overview

:nodoc:

Defined Under Namespace

Classes: Server

Constant Summary collapse

@@after_fork_hooks =
[]
@@run_cleanup_hooks =
[]

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_size) ⇒ Parallelization

Returns a new instance of Parallelization.



53
54
55
56
57
58
59
# File 'lib/active_support/testing/parallelization.rb', line 53

def initialize(queue_size)
  @queue_size = queue_size
  @queue      = Server.new
  @pool       = []

  @url = DRb.start_service("drbunix:", @queue).uri
end

Class Method Details

.after_fork_hook(&blk) ⇒ Object



39
40
41
# File 'lib/active_support/testing/parallelization.rb', line 39

def self.after_fork_hook(&blk)
  @@after_fork_hooks << blk
end

.run_cleanup_hook(&blk) ⇒ Object



47
48
49
# File 'lib/active_support/testing/parallelization.rb', line 47

def self.run_cleanup_hook(&blk)
  @@run_cleanup_hooks << blk
end

Instance Method Details

#<<(work) ⇒ Object



115
116
117
# File 'lib/active_support/testing/parallelization.rb', line 115

def <<(work)
  @queue << work
end

#after_fork(worker) ⇒ Object



61
62
63
64
65
# File 'lib/active_support/testing/parallelization.rb', line 61

def after_fork(worker)
  self.class.after_fork_hooks.each do |cb|
    cb.call(worker)
  end
end

#run_cleanup(worker) ⇒ Object



67
68
69
70
71
# File 'lib/active_support/testing/parallelization.rb', line 67

def run_cleanup(worker)
  self.class.run_cleanup_hooks.each do |cb|
    cb.call(worker)
  end
end

#shutdownObject



119
120
121
122
123
124
125
126
# File 'lib/active_support/testing/parallelization.rb', line 119

def shutdown
  @queue_size.times { @queue << nil }
  @pool.each { |pid| Process.waitpid pid }

  if @queue.length > 0
    raise "Queue not empty, but all workers have finished. This probably means that a worker crashed and #{@queue.length} tests were missed."
  end
end

#startObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/active_support/testing/parallelization.rb', line 73

def start
  @pool = @queue_size.times.map do |worker|
    fork do
      DRb.stop_service

      begin
        after_fork(worker)
      rescue => setup_exception; end

      queue = DRbObject.new_with_uri(@url)

      while job = queue.pop
        klass    = job[0]
        method   = job[1]
        reporter = job[2]
        result = klass.with_info_handler reporter do
          Minitest.run_one_method(klass, method)
        end

        add_setup_exception(result, setup_exception) if setup_exception

        begin
          queue.record(reporter, result)
        rescue DRb::DRbConnError
          result.failures.map! do |failure|
            if failure.respond_to?(:error)
              # minitest >5.14.0
              error = DRb::DRbRemoteError.new(failure.error)
            else
              error = DRb::DRbRemoteError.new(failure.exception)
            end
            Minitest::UnexpectedError.new(error)
          end
          queue.record(reporter, result)
        end
      end
    ensure
      run_cleanup(worker)
    end
  end
end