Class: ActiveSupport::Testing::Parallelization

Inherits:
Object
  • Object
show all
Defined in:
lib/active_support/testing/parallelization.rb

Overview

:nodoc:

Defined Under Namespace

Classes: Server

Constant Summary collapse

@@after_fork_hooks =
[]
@@run_cleanup_hooks =
[]

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_size) ⇒ Parallelization

Returns a new instance of Parallelization.

[View source]

53
54
55
56
57
58
59
# File 'lib/active_support/testing/parallelization.rb', line 53

def initialize(queue_size)
  @queue_size = queue_size
  @queue      = Server.new
  @pool       = []

  @url = DRb.start_service("drbunix:", @queue).uri
end

Class Method Details

.after_fork_hook(&blk) ⇒ Object

[View source]

39
40
41
# File 'lib/active_support/testing/parallelization.rb', line 39

def self.after_fork_hook(&blk)
  @@after_fork_hooks << blk
end

.run_cleanup_hook(&blk) ⇒ Object

[View source]

47
48
49
# File 'lib/active_support/testing/parallelization.rb', line 47

def self.run_cleanup_hook(&blk)
  @@run_cleanup_hooks << blk
end

Instance Method Details

#<<(work) ⇒ Object

[View source]

115
116
117
# File 'lib/active_support/testing/parallelization.rb', line 115

def <<(work)
  @queue << work
end

#after_fork(worker) ⇒ Object

[View source]

61
62
63
64
65
# File 'lib/active_support/testing/parallelization.rb', line 61

def after_fork(worker)
  self.class.after_fork_hooks.each do |cb|
    cb.call(worker)
  end
end

#run_cleanup(worker) ⇒ Object

[View source]

67
68
69
70
71
# File 'lib/active_support/testing/parallelization.rb', line 67

def run_cleanup(worker)
  self.class.run_cleanup_hooks.each do |cb|
    cb.call(worker)
  end
end

#shutdownObject

[View source]

119
120
121
122
123
124
125
126
# File 'lib/active_support/testing/parallelization.rb', line 119

def shutdown
  @queue_size.times { @queue << nil }
  @pool.each { |pid| Process.waitpid pid }

  if @queue.length > 0
    raise "Queue not empty, but all workers have finished. This probably means that a worker crashed and #{@queue.length} tests were missed."
  end
end

#startObject

[View source]

73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/active_support/testing/parallelization.rb', line 73

def start
  @pool = @queue_size.times.map do |worker|
    fork do
      DRb.stop_service

      begin
        after_fork(worker)
      rescue => setup_exception; end

      queue = DRbObject.new_with_uri(@url)

      while job = queue.pop
        klass    = job[0]
        method   = job[1]
        reporter = job[2]
        result = klass.with_info_handler reporter do
          Minitest.run_one_method(klass, method)
        end

        add_setup_exception(result, setup_exception) if setup_exception

        begin
          queue.record(reporter, result)
        rescue DRb::DRbConnError
          result.failures.map! do |failure|
            if failure.respond_to?(:error)
              # minitest >5.14.0
              error = DRb::DRbRemoteError.new(failure.error)
            else
              error = DRb::DRbRemoteError.new(failure.exception)
            end
            Minitest::UnexpectedError.new(error)
          end
          queue.record(reporter, result)
        end
      end
    ensure
      run_cleanup(worker)
    end
  end
end