Class: ConcurrencyHelper::ParallelWorkersQueue
- Inherits:
-
Object
- Object
- ConcurrencyHelper::ParallelWorkersQueue
- Defined in:
- lib/sapis/concurrency_helper.rb
Overview
Parallel working, constrained on the number of threads. Sets Thread.abort_on_exception to true as default.
Usage:
ConcurrencyHelper.with_parallel_queue( <slots> ) do | queue, semaphore |
queue.push { <operation_1> }
queue.push { <operation_2> }
end
or:
ConcurrencyHelper.with_parallel_queue( <slots>, :instances => <Enumerable> ) do | instance, semaphore |
-> { <operation>( <instance> ) }
end
or:
queue = ParallelWorkersQueue.new( <threads> )
queue.push { <operation_1> }
queue.push { <operation_2> }
queue.join
The semaphore is a generic semaphore; it can be used for example to lock when printing information to stdout.
Instance Method Summary collapse
-
#initialize(slots, options = {}) ⇒ ParallelWorkersQueue
constructor
A new instance of ParallelWorkersQueue.
- #join ⇒ Object
- #push(&task) ⇒ Object
Constructor Details
#initialize(slots, options = {}) ⇒ ParallelWorkersQueue
Returns a new instance of ParallelWorkersQueue.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/sapis/concurrency_helper.rb', line 50 def initialize(slots, ={}) abort_on_exception = !.has_key?(:abort_on_exception) || [:abort_on_exception] @queue = SizedQueue.new(slots) @threads = slots.times.map do Thread.new do while (data = @queue.pop) != :stop data[] end end end Thread.abort_on_exception = abort_on_exception end |
Instance Method Details
#join ⇒ Object
70 71 72 73 74 75 76 |
# File 'lib/sapis/concurrency_helper.rb', line 70 def join @threads.each do @queue.push(:stop) end @threads.each(&:join) end |
#push(&task) ⇒ Object
66 67 68 |
# File 'lib/sapis/concurrency_helper.rb', line 66 def push(&task) @queue.push(task) end |