Class: Rake::ThreadPool
- Inherits:
-
Object
- Object
- Rake::ThreadPool
- Defined in:
- lib/rake/thread_pool.rb
Overview
:nodoc: all
Instance Method Summary collapse
-
#future(*args, &block) ⇒ Object
Creates a future executed by the
ThreadPool
. -
#gather_history ⇒ Object
Enable the gathering of history events.
-
#history ⇒ Object
Return a array of history events for the thread pool.
-
#initialize(thread_count) ⇒ ThreadPool
constructor
Creates a ThreadPool object.
-
#join ⇒ Object
Waits until the queue of futures is empty and all threads have exited.
-
#statistics ⇒ Object
Return a hash of always collected statistics for the thread pool.
Constructor Details
#initialize(thread_count) ⇒ ThreadPool
Creates a ThreadPool object. The thread_count
parameter is the size of the pool.
11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/rake/thread_pool.rb', line 11 def initialize(thread_count) require "set" @max_active_threads = [thread_count, 0].max @threads = Set.new @threads_mon = Monitor.new @queue = Queue.new @join_cond = @threads_mon.new_cond @history_start_time = nil @history = [] @history_mon = Monitor.new @total_threads_in_play = 0 end |
Instance Method Details
#future(*args, &block) ⇒ Object
Creates a future executed by the ThreadPool
.
The args are passed to the block when executing (similarly to Thread#new) The return value is an object representing a future which has been created and added to the queue in the pool. Sending #value to the object will sleep the current thread until the future is finished and will return the result (or raise an exception thrown from the future)
33 34 35 36 37 38 39 40 41 |
# File 'lib/rake/thread_pool.rb', line 33 def future(*args, &block) promise = Promise.new(args, &block) promise.recorder = lambda { |*stats| stat(*stats) } @queue.enq promise stat :queued, item_id: promise.object_id start_thread promise end |
#gather_history ⇒ Object
Enable the gathering of history events.
68 69 70 |
# File 'lib/rake/thread_pool.rb', line 68 def gather_history #:nodoc: @history_start_time = Time.now if @history_start_time.nil? end |
#history ⇒ Object
Return a array of history events for the thread pool.
History gathering must be enabled to be able to see the events (see #gather_history). Best to call this when the job is complete (i.e. after ThreadPool#join is called).
77 78 79 80 81 |
# File 'lib/rake/thread_pool.rb', line 77 def history # :nodoc: @history_mon.synchronize { @history.dup }. sort_by { |i| i[:time] }. each { |i| i[:time] -= @history_start_time } end |
#join ⇒ Object
Waits until the queue of futures is empty and all threads have exited.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/rake/thread_pool.rb', line 44 def join @threads_mon.synchronize do begin stat :joining @join_cond.wait unless @threads.empty? stat :joined rescue Exception => e stat :joined $stderr.puts e $stderr.print "Queue contains #{@queue.size} items. " + "Thread pool contains #{@threads.count} threads\n" $stderr.print "Current Thread #{Thread.current} status = " + "#{Thread.current.status}\n" $stderr.puts e.backtrace.join("\n") @threads.each do |t| $stderr.print "Thread #{t} status = #{t.status}\n" $stderr.puts t.backtrace.join("\n") end raise e end end end |
#statistics ⇒ Object
Return a hash of always collected statistics for the thread pool.
84 85 86 87 88 89 |
# File 'lib/rake/thread_pool.rb', line 84 def statistics # :nodoc: { total_threads_in_play: @total_threads_in_play, max_active_threads: @max_active_threads, } end |