Class: Workhorse::Pool
- Inherits:
-
Object
- Object
- Workhorse::Pool
- Defined in:
- lib/workhorse/pool.rb
Overview
Abstraction layer of a simple thread pool implementation used by the worker.
Instance Attribute Summary collapse
-
#mutex ⇒ Object
readonly
Returns the value of attribute mutex.
Instance Method Summary collapse
-
#idle ⇒ Object
Returns the number of idle threads.
-
#initialize(size) ⇒ Pool
constructor
A new instance of Pool.
- #on_idle(&block) ⇒ Object
-
#post ⇒ Object
Posts a new work unit to the pool.
-
#shutdown ⇒ Object
Shuts down the pool and waits for termination.
-
#wait ⇒ Object
Waits until the pool is shut down.
Constructor Details
#initialize(size) ⇒ Pool
Returns a new instance of Pool.
6 7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/workhorse/pool.rb', line 6 def initialize(size) @size = size @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 0, max_threads: @size, max_queue: 0, fallback_policy: :abort, auto_terminate: false ) @mutex = Mutex.new @active_threads = Concurrent::AtomicFixnum.new(0) @on_idle = nil end |
Instance Attribute Details
#mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
4 5 6 |
# File 'lib/workhorse/pool.rb', line 4 def mutex @mutex end |
Instance Method Details
#idle ⇒ Object
Returns the number of idle threads.
47 48 49 |
# File 'lib/workhorse/pool.rb', line 47 def idle @size - @active_threads.value end |
#on_idle(&block) ⇒ Object
20 21 22 |
# File 'lib/workhorse/pool.rb', line 20 def on_idle(&block) @on_idle = block end |
#post ⇒ Object
Posts a new work unit to the pool.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/workhorse/pool.rb', line 25 def post mutex.synchronize do if idle.zero? fail 'All threads are busy.' end active_threads = @active_threads active_threads.increment @executor.post do begin yield ensure active_threads.decrement @on_idle.try(:call) end end end end |
#shutdown ⇒ Object
Shuts down the pool and waits for termination.
65 66 67 68 |
# File 'lib/workhorse/pool.rb', line 65 def shutdown @executor.shutdown wait end |
#wait ⇒ Object
Waits until the pool is shut down. This will wait forever unless you eventually call shutdown (either before calling ‘wait` or after it in another thread).
54 55 56 57 58 59 60 61 62 |
# File 'lib/workhorse/pool.rb', line 54 def wait # Here we use a loop-sleep combination instead of using # ThreadPoolExecutor's `wait_for_termination`. See issue #21 for more # information. loop do break if @executor.shutdown? sleep 0.1 end end |