Class: Pgbus::ExecutionPools::AsyncPool

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/execution_pools/async_pool.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(capacity:, on_state_change: nil) ⇒ AsyncPool

Returns a new instance of AsyncPool.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/pgbus/execution_pools/async_pool.rb', line 8

def initialize(capacity:, on_state_change: nil)
  @capacity = capacity
  @on_state_change = on_state_change
  @available_capacity = capacity
  @mutex = Mutex.new
  @state_mutex = Mutex.new
  @shutdown_flag = false
  @fatal_error = nil
  @boot_queue = Thread::Queue.new
  @pending = Thread::Queue.new
  @wake_rd, @wake_wr = IO.pipe

  validate_dependencies!
  @reactor_thread = start_reactor
  result = @boot_queue.pop
  raise result if result.is_a?(Exception)
end

Instance Attribute Details

#capacityObject (readonly)

Returns the value of attribute capacity.



6
7
8
# File 'lib/pgbus/execution_pools/async_pool.rb', line 6

def capacity
  @capacity
end

Instance Method Details

#available_capacityObject



40
41
42
43
# File 'lib/pgbus/execution_pools/async_pool.rb', line 40

def available_capacity
  raise_if_fatal!
  @mutex.synchronize { @available_capacity }
end

#idle?Boolean

Returns:

  • (Boolean)


45
46
47
# File 'lib/pgbus/execution_pools/async_pool.rb', line 45

def idle?
  available_capacity.positive?
end

#killObject



66
67
68
69
# File 'lib/pgbus/execution_pools/async_pool.rb', line 66

def kill
  shutdown
  @reactor_thread&.kill
end

#metadataObject



71
72
73
74
75
76
77
78
# File 'lib/pgbus/execution_pools/async_pool.rb', line 71

def 
  inflight = @mutex.synchronize { @available_capacity }
  {
    mode: :async,
    capacity: @capacity,
    busy: @capacity - inflight
  }
end

#post(&block) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/pgbus/execution_pools/async_pool.rb', line 26

def post(&block)
  raise_if_fatal!
  raise "Execution pool is shutting down" if shutdown?

  reserved = false
  reserve_capacity!
  reserved = true
  @pending << block
  wake_reactor
rescue StandardError
  restore_capacity if reserved
  raise
end

#shutdownObject



49
50
51
52
53
54
55
56
# File 'lib/pgbus/execution_pools/async_pool.rb', line 49

def shutdown
  @state_mutex.synchronize do
    return false if @shutdown_flag

    @shutdown_flag = true
  end
  wake_reactor
end

#shutdown?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/pgbus/execution_pools/async_pool.rb', line 58

def shutdown?
  @state_mutex.synchronize { @shutdown_flag }
end

#wait_for_termination(timeout) ⇒ Object



62
63
64
# File 'lib/pgbus/execution_pools/async_pool.rb', line 62

def wait_for_termination(timeout)
  @reactor_thread&.join(timeout)
end