Class: Pgbus::ExecutionPools::AsyncPool
- Inherits:
-
Object
- Object
- Pgbus::ExecutionPools::AsyncPool
- Defined in:
- lib/pgbus/execution_pools/async_pool.rb
Constant Summary collapse
- IDLE_WAIT_INTERVAL =
0.01
Instance Attribute Summary collapse
-
#capacity ⇒ Object
readonly
Returns the value of attribute capacity.
Instance Method Summary collapse
- #available_capacity ⇒ Object
- #idle? ⇒ Boolean
-
#initialize(capacity:, on_state_change: nil) ⇒ AsyncPool
constructor
A new instance of AsyncPool.
- #kill ⇒ Object
- #metadata ⇒ Object
- #post(&block) ⇒ Object
- #shutdown ⇒ Object
- #shutdown? ⇒ Boolean
- #wait_for_termination(timeout) ⇒ Object
Constructor Details
#initialize(capacity:, on_state_change: nil) ⇒ AsyncPool
Returns a new instance of AsyncPool.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/pgbus/execution_pools/async_pool.rb', line 10 def initialize(capacity:, on_state_change: nil) @capacity = capacity @on_state_change = on_state_change @available_capacity = capacity @mutex = Mutex.new @state_mutex = Mutex.new @shutdown_flag = false @fatal_error = nil @boot_queue = Thread::Queue.new @pending = Thread::Queue.new validate_dependencies! @reactor_thread = start_reactor result = @boot_queue.pop raise result if result.is_a?(Exception) end |
Instance Attribute Details
#capacity ⇒ Object (readonly)
Returns the value of attribute capacity.
6 7 8 |
# File 'lib/pgbus/execution_pools/async_pool.rb', line 6 def capacity @capacity end |
Instance Method Details
#available_capacity ⇒ Object
40 41 42 43 |
# File 'lib/pgbus/execution_pools/async_pool.rb', line 40 def available_capacity raise_if_fatal! @mutex.synchronize { @available_capacity } end |
#idle? ⇒ Boolean
45 46 47 |
# File 'lib/pgbus/execution_pools/async_pool.rb', line 45 def idle? available_capacity.positive? end |
#kill ⇒ Object
65 66 67 68 |
# File 'lib/pgbus/execution_pools/async_pool.rb', line 65 def kill shutdown @reactor_thread&.kill end |
#metadata ⇒ Object
70 71 72 73 74 75 76 77 |
# File 'lib/pgbus/execution_pools/async_pool.rb', line 70 def inflight = @mutex.synchronize { @available_capacity } { mode: :async, capacity: @capacity, busy: @capacity - inflight } end |
#post(&block) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/pgbus/execution_pools/async_pool.rb', line 27 def post(&block) raise_if_fatal! raise "Execution pool is shutting down" if shutdown? reserved = false reserve_capacity! reserved = true @pending << block rescue StandardError restore_capacity if reserved raise end |
#shutdown ⇒ Object
49 50 51 52 53 54 55 |
# File 'lib/pgbus/execution_pools/async_pool.rb', line 49 def shutdown @state_mutex.synchronize do return false if @shutdown_flag @shutdown_flag = true end end |
#shutdown? ⇒ Boolean
57 58 59 |
# File 'lib/pgbus/execution_pools/async_pool.rb', line 57 def shutdown? @state_mutex.synchronize { @shutdown_flag } end |
#wait_for_termination(timeout) ⇒ Object
61 62 63 |
# File 'lib/pgbus/execution_pools/async_pool.rb', line 61 def wait_for_termination(timeout) @reactor_thread&.join(timeout) end |