Class: Rake::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/rake/thread_pool.rb

Overview

:nodoc: all

Instance Method Summary collapse

Constructor Details

#initialize(thread_count) ⇒ ThreadPool

Creates a ThreadPool object. The thread_count parameter is the size of the pool.



11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/rake/thread_pool.rb', line 11

def initialize(thread_count)
  require "set"
  @max_active_threads = [thread_count, 0].max
  @threads = Set.new
  @threads_mon = Monitor.new
  @queue = Queue.new
  @join_cond = @threads_mon.new_cond

  @history_start_time = nil
  @history = []
  @history_mon = Monitor.new
  @total_threads_in_play = 0
end

Instance Method Details

#future(*args, &block) ⇒ Object

Creates a future executed by the ThreadPool.

The args are passed to the block when executing (similarly to Thread#new) The return value is an object representing a future which has been created and added to the queue in the pool. Sending #value to the object will sleep the current thread until the future is finished and will return the result (or raise an exception thrown from the future)



33
34
35
36
37
38
39
40
41
# File 'lib/rake/thread_pool.rb', line 33

def future(*args, &block)
  promise = Promise.new(args, &block)
  promise.recorder = lambda { |*stats| stat(*stats) }

  @queue.enq promise
  stat :queued, item_id: promise.object_id
  start_thread
  promise
end

#gather_historyObject

Enable the gathering of history events.



68
69
70
# File 'lib/rake/thread_pool.rb', line 68

def gather_history          #:nodoc:
  @history_start_time = Time.now if @history_start_time.nil?
end

#historyObject

Return a array of history events for the thread pool.

History gathering must be enabled to be able to see the events (see #gather_history). Best to call this when the job is complete (i.e. after ThreadPool#join is called).



77
78
79
80
81
# File 'lib/rake/thread_pool.rb', line 77

def history                 # :nodoc:
  @history_mon.synchronize { @history.dup }.
    sort_by { |i| i[:time] }.
    each { |i| i[:time] -= @history_start_time }
end

#joinObject

Waits until the queue of futures is empty and all threads have exited.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/rake/thread_pool.rb', line 44

def join
  @threads_mon.synchronize do
    begin
      stat :joining
      @join_cond.wait unless @threads.empty?
      stat :joined
    rescue Exception => e
      stat :joined
      $stderr.puts e
      $stderr.print "Queue contains #{@queue.size} items. " +
        "Thread pool contains #{@threads.count} threads\n"
      $stderr.print "Current Thread #{Thread.current} status = " +
        "#{Thread.current.status}\n"
      $stderr.puts e.backtrace.join("\n")
      @threads.each do |t|
        $stderr.print "Thread #{t} status = #{t.status}\n"
        $stderr.puts t.backtrace.join("\n")
      end
      raise e
    end
  end
end

#statisticsObject

Return a hash of always collected statistics for the thread pool.



84
85
86
87
88
89
# File 'lib/rake/thread_pool.rb', line 84

def statistics              #  :nodoc:
  {
    total_threads_in_play: @total_threads_in_play,
    max_active_threads: @max_active_threads,
  }
end