Module: Contrek::Concurrent::Poolable

Included in:
Finder, Merger
Defined in:
lib/contrek/finder/concurrent/poolable.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#number_of_threadsObject (readonly)

Returns the value of attribute number_of_threads.



8
9
10
# File 'lib/contrek/finder/concurrent/poolable.rb', line 8

def number_of_threads
  @number_of_threads
end

Instance Method Details

#enqueue!(**payload, &block) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/contrek/finder/concurrent/poolable.rb', line 22

def enqueue!(**payload, &block)
  if @number_of_threads > 0
    @threads << Thread.new do
      @semaphore.acquire
      begin
        block.call(payload)
      ensure
        @semaphore.release
      end
    end
  else
    block.call(payload)
  end
end

#initialize(number_of_threads: 0, **kwargs) ⇒ Object



9
10
11
12
13
14
15
16
# File 'lib/contrek/finder/concurrent/poolable.rb', line 9

def initialize(number_of_threads: 0, **kwargs)
  @number_of_threads = number_of_threads || ::Concurrent.physical_processor_count
  if @number_of_threads > 0
    @threads = ::Concurrent::Array.new
    @semaphore = ::Concurrent::Semaphore.new(@number_of_threads)
  end
  super(**kwargs)
end

#wait!Object



18
19
20
# File 'lib/contrek/finder/concurrent/poolable.rb', line 18

def wait!
  @threads.each(&:join)
end