Class: ConcurrencyHelper::ParallelWorkersQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/sapis/concurrency_helper.rb

Overview

Parallel working, constrained on the number of threads. Sets Thread.abort_on_exception to true as default.

Usage:

ConcurrencyHelper.with_parallel_queue( <slots> ) do | queue, semaphore |
  queue.push { <operation_1> }
  queue.push { <operation_2> }
end

or:

ConcurrencyHelper.with_parallel_queue( <slots>, :instances => <Enumerable> ) do | instance, semaphore |
  -> { <operation>( <instance> ) }
end

or:

queue = ParallelWorkersQueue.new( <threads> )
queue.push { <operation_1> }
queue.push { <operation_2> }
queue.join

The semaphore is a generic semaphore; it can be used for example to lock when printing information to stdout.

Instance Method Summary collapse

Constructor Details

#initialize(slots, options = {}) ⇒ ParallelWorkersQueue

Returns a new instance of ParallelWorkersQueue.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/sapis/concurrency_helper.rb', line 50

def initialize(slots, options={})
  abort_on_exception = !options.has_key?(:abort_on_exception) || options[:abort_on_exception]

  @queue = SizedQueue.new(slots)

  @threads = slots.times.map do
    Thread.new do
      while (data = @queue.pop) != :stop
        data[]
      end
    end
  end

  Thread.abort_on_exception = abort_on_exception
end

Instance Method Details

#joinObject



70
71
72
73
74
75
76
# File 'lib/sapis/concurrency_helper.rb', line 70

def join
  @threads.each do
    @queue.push(:stop)
  end

  @threads.each(&:join)
end

#push(&task) ⇒ Object



66
67
68
# File 'lib/sapis/concurrency_helper.rb', line 66

def push(&task)
  @queue.push(task)
end