Class: Pgbus::ExecutionPools::AsyncPool

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/execution_pools/async_pool.rb

Constant Summary collapse

IDLE_WAIT_INTERVAL =
0.01

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(capacity:, on_state_change: nil) ⇒ AsyncPool

Returns a new instance of AsyncPool.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/pgbus/execution_pools/async_pool.rb', line 10

def initialize(capacity:, on_state_change: nil)
  @capacity = capacity
  @on_state_change = on_state_change
  @available_capacity = capacity
  @mutex = Mutex.new
  @state_mutex = Mutex.new
  @shutdown_flag = false
  @fatal_error = nil
  @boot_queue = Thread::Queue.new
  @pending = Thread::Queue.new

  validate_dependencies!
  @reactor_thread = start_reactor
  result = @boot_queue.pop
  raise result if result.is_a?(Exception)
end

Instance Attribute Details

#capacityObject (readonly)

Returns the value of attribute capacity.



6
7
8
# File 'lib/pgbus/execution_pools/async_pool.rb', line 6

def capacity
  @capacity
end

Instance Method Details

#available_capacityObject



40
41
42
43
# File 'lib/pgbus/execution_pools/async_pool.rb', line 40

def available_capacity
  raise_if_fatal!
  @mutex.synchronize { @available_capacity }
end

#idle?Boolean

Returns:

  • (Boolean)


45
46
47
# File 'lib/pgbus/execution_pools/async_pool.rb', line 45

def idle?
  available_capacity.positive?
end

#killObject



65
66
67
68
# File 'lib/pgbus/execution_pools/async_pool.rb', line 65

def kill
  shutdown
  @reactor_thread&.kill
end

#metadataObject



70
71
72
73
74
75
76
77
# File 'lib/pgbus/execution_pools/async_pool.rb', line 70

def 
  inflight = @mutex.synchronize { @available_capacity }
  {
    mode: :async,
    capacity: @capacity,
    busy: @capacity - inflight
  }
end

#post(&block) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/pgbus/execution_pools/async_pool.rb', line 27

def post(&block)
  raise_if_fatal!
  raise "Execution pool is shutting down" if shutdown?

  reserved = false
  reserve_capacity!
  reserved = true
  @pending << block
rescue StandardError
  restore_capacity if reserved
  raise
end

#shutdownObject



49
50
51
52
53
54
55
# File 'lib/pgbus/execution_pools/async_pool.rb', line 49

def shutdown
  @state_mutex.synchronize do
    return false if @shutdown_flag

    @shutdown_flag = true
  end
end

#shutdown?Boolean

Returns:

  • (Boolean)


57
58
59
# File 'lib/pgbus/execution_pools/async_pool.rb', line 57

def shutdown?
  @state_mutex.synchronize { @shutdown_flag }
end

#wait_for_termination(timeout) ⇒ Object



61
62
63
# File 'lib/pgbus/execution_pools/async_pool.rb', line 61

def wait_for_termination(timeout)
  @reactor_thread&.join(timeout)
end