Class: Philiprehberger::Pool::ResourcePool
- Inherits:
-
Object
- Object
- Philiprehberger::Pool::ResourcePool
- Defined in:
- lib/philiprehberger/pool.rb
Instance Method Summary collapse
- #checkin(resource) ⇒ Object
- #checkout(timeout: nil) ⇒ Object
-
#clear ⇒ Integer
Destroy every idle (available) resource without changing the pool’s configured capacity or shutting it down.
- #drain ⇒ Object
-
#initialize(size:, timeout: 5, idle_timeout: nil, health_check: nil, &factory) ⇒ ResourcePool
constructor
A new instance of ResourcePool.
- #prune_idle ⇒ Object
- #shutdown ⇒ Object
- #shutdown? ⇒ Boolean
- #size ⇒ Object
-
#stats ⇒ Hash
Snapshot of the pool’s internal counters.
-
#utilization ⇒ Float
Fraction of the pool currently in use.
-
#waiting ⇒ Integer
Number of threads currently waiting for a resource to become available.
- #with ⇒ Object
Constructor Details
#initialize(size:, timeout: 5, idle_timeout: nil, health_check: nil, &factory) ⇒ ResourcePool
Returns a new instance of ResourcePool.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/philiprehberger/pool.rb', line 16 def initialize(size:, timeout: 5, idle_timeout: nil, health_check: nil, &factory) raise ArgumentError, 'size must be positive' unless size.positive? raise ArgumentError, 'factory block is required' unless factory @size = size @timeout = timeout @idle_timeout = idle_timeout @health_check = health_check @factory = factory @mutex = Mutex.new @condition = ConditionVariable.new @available = [] @created = 0 @in_use = Set.new @waiting = 0 @shutdown = false end |
Instance Method Details
#checkin(resource) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/philiprehberger/pool.rb', line 95 def checkin(resource) @mutex.synchronize do return unless @in_use.delete?(resource) if @shutdown destroy_resource(resource) else @available.push(PoolEntry.new(resource: resource, last_used: Time.now)) @condition.signal end end end |
#checkout(timeout: nil) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/philiprehberger/pool.rb', line 46 def checkout(timeout: nil) raise ShutdownError, 'pool is shut down' if @shutdown effective_timeout = timeout || @timeout deadline = Time.now + effective_timeout @mutex.synchronize do loop do raise ShutdownError, 'pool is shut down' if @shutdown # Try to get a valid resource from the available pool until @available.empty? entry = @available.pop if idle_expired?(entry) destroy_resource(entry.resource) next end if health_check_fails?(entry.resource) destroy_resource(entry.resource) next end @in_use.add(entry.resource) return entry.resource end # Create new resource if under capacity if @created < @size resource = create_resource @in_use.add(resource) return resource end # Wait for a resource to become available remaining = deadline - Time.now raise TimeoutError, "could not obtain resource within #{effective_timeout}s" if remaining <= 0 @waiting += 1 begin @condition.wait(@mutex, remaining) ensure @waiting -= 1 end end end end |
#clear ⇒ Integer
Destroy every idle (available) resource without changing the pool’s configured capacity or shutting it down. Resources currently checked out are left untouched and will be destroyed normally on ‘checkin` via the next health-check cycle (or kept if healthy). Useful for credential-rotation flows where every cached connection should be discarded but the pool itself must keep accepting new checkouts.
207 208 209 210 211 212 213 214 215 216 |
# File 'lib/philiprehberger/pool.rb', line 207 def clear @mutex.synchronize do raise ShutdownError, 'pool is shut down' if @shutdown cleared = @available @available = [] cleared.each { |entry| destroy_resource(entry.resource) } cleared.size end end |
#drain ⇒ Object
181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/philiprehberger/pool.rb', line 181 def drain @mutex.synchronize do raise ShutdownError, 'pool is shut down' if @shutdown drained = @available.dup @available.clear @size -= drained.size drained.each do |entry| entry.resource.close if entry.resource.respond_to?(:close) end drained.size end end |
#prune_idle ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/philiprehberger/pool.rb', line 154 def prune_idle @mutex.synchronize do raise ShutdownError, 'pool is shut down' if @shutdown return 0 unless @idle_timeout now = Time.now expired, kept = @available.partition { |entry| (now - entry.last_used) > @idle_timeout } @available = kept expired.each { |entry| destroy_resource(entry.resource) } expired.size end end |
#shutdown ⇒ Object
167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/philiprehberger/pool.rb', line 167 def shutdown @mutex.synchronize do @shutdown = true @available.each { |entry| destroy_resource(entry.resource) } @available.clear @in_use.each { |resource| destroy_resource(resource) } @in_use.clear @condition.broadcast end end |
#shutdown? ⇒ Boolean
195 196 197 |
# File 'lib/philiprehberger/pool.rb', line 195 def shutdown? @shutdown end |
#size ⇒ Object
135 136 137 |
# File 'lib/philiprehberger/pool.rb', line 135 def size @mutex.synchronize { @size } end |
#stats ⇒ Hash
Snapshot of the pool’s internal counters.
Includes the number of threads currently blocked in ‘checkout`/`with` waiting for a resource (`:waiting`). A non-zero `:waiting` while `in_use == max` indicates back-pressure and is a good signal that the pool may need to be sized up.
116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/philiprehberger/pool.rb', line 116 def stats @mutex.synchronize do { size: @created, available: @available.size, in_use: @in_use.size, max: @size, waiting: @waiting } end end |
#utilization ⇒ Float
Fraction of the pool currently in use.
Returns ‘in_use / max` as a `Float` between `0.0` and `1.0`. Useful for dashboards/metrics without needing to unpack `#stats`. Returns `0.0` when the pool has been shut down or has zero capacity.
146 147 148 149 150 151 152 |
# File 'lib/philiprehberger/pool.rb', line 146 def utilization @mutex.synchronize do return 0.0 if @size <= 0 @in_use.size.to_f / @size end end |
#waiting ⇒ Integer
Number of threads currently waiting for a resource to become available.
131 132 133 |
# File 'lib/philiprehberger/pool.rb', line 131 def waiting @mutex.synchronize { @waiting } end |
#with ⇒ Object
35 36 37 38 39 40 41 42 43 44 |
# File 'lib/philiprehberger/pool.rb', line 35 def with raise ShutdownError, 'pool is shut down' if @shutdown resource = checkout begin yield resource ensure checkin(resource) end end |