Class: WaterDrop::ConnectionPool
- Inherits:
-
Object
- Object
- WaterDrop::ConnectionPool
- Extended by:
- Forwardable
- Defined in:
- lib/waterdrop/connection_pool.rb
Overview
Connection pool wrapper for WaterDrop producers using the proven connection_pool gem.
This provides a clean WaterDrop-specific API while leveraging the battle-tested, connection_pool gem underneath. The wrapper hides the direct usage of the connection_pool gem and provides WaterDrop-specific configuration.
Class Attribute Summary collapse
-
.default_pool ⇒ ConnectionPool?
The global connection pool instance.
Instance Attribute Summary collapse
-
#pool ⇒ ::ConnectionPool
readonly
Returns the underlying connection_pool instance This allows access to advanced connection_pool features if needed.
Class Method Summary collapse
-
.active? ⇒ Boolean
Check if the global connection pool is active (configured).
-
.close ⇒ Object
Shutdown the global connection pool Alias for shutdown to align with producer API WaterDrop producers use #close, so we alias connection pool #shutdown to #close for API consistency across both individual producers and connection pools.
-
.reload ⇒ Object
Reload the global connection pool.
-
.setup(size: 5, timeout: 5000, &producer_config) {|config, index| ... } ⇒ ConnectionPool
Sets up a global connection pool.
-
.shutdown(force: false) ⇒ Object
Shutdown the global connection pool.
-
.stats ⇒ Hash?
Get statistics about the global pool.
-
.transaction {|producer| ... } ⇒ Object
Execute a transaction with a producer from the global connection pool Only available when connection pool is configured.
-
.with {|producer| ... } ⇒ Object
Executes a block with a producer from the global pool.
Instance Method Summary collapse
-
#initialize(size: 5, timeout: 5000, &producer_config) {|config, index| ... } ⇒ ConnectionPool
constructor
Creates a new WaterDrop connection pool.
-
#reload ⇒ Object
Reload all connections in the pool.
-
#shutdown(force: false) ⇒ Object
(also: #close)
Shutdown the connection pool.
-
#stats ⇒ Hash
Get pool statistics.
-
#transaction {|producer| ... } ⇒ Object
Execute a transaction with a producer from this connection pool.
Constructor Details
#initialize(size: 5, timeout: 5000, &producer_config) {|config, index| ... } ⇒ ConnectionPool
Creates a new WaterDrop connection pool
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/waterdrop/connection_pool.rb', line 204 def initialize(size: 5, timeout: 5000, &producer_config) self.class.send(:ensure_connection_pool_gem!) @producer_config = producer_config @pool_index = 0 @pool_mutex = Mutex.new @pool = ::ConnectionPool.new(size: size, timeout: timeout / 1000.0) do producer_index = @pool_mutex.synchronize { @pool_index += 1 } WaterDrop::Producer.new do |config| if @producer_config.arity == 2 @producer_config.call(config, producer_index) else @producer_config.call(config) end end end # Emit event when a connection pool is created WaterDrop.instrumentation.instrument( "connection_pool.created", pool: self, size: size, timeout: timeout ) end |
Class Attribute Details
.default_pool ⇒ ConnectionPool?
Returns the global connection pool instance.
47 48 49 |
# File 'lib/waterdrop/connection_pool.rb', line 47 def default_pool @default_pool end |
Instance Attribute Details
#pool ⇒ ::ConnectionPool (readonly)
Returns the underlying connection_pool instance This allows access to advanced connection_pool features if needed
305 306 307 |
# File 'lib/waterdrop/connection_pool.rb', line 305 def pool @pool end |
Class Method Details
.active? ⇒ Boolean
Check if the global connection pool is active (configured)
154 155 156 |
# File 'lib/waterdrop/connection_pool.rb', line 154 def active? !@default_pool.nil? end |
.close ⇒ Object
Shutdown the global connection pool Alias for shutdown to align with producer API WaterDrop producers use #close, so we alias connection pool #shutdown to #close for API consistency across both individual producers and connection pools
136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/waterdrop/connection_pool.rb', line 136 def shutdown(force: false) return unless @default_pool pool = @default_pool @default_pool.shutdown(force: force) @default_pool = nil # Emit global event for pool shutdown WaterDrop.instrumentation.instrument( "connection_pool.shutdown", pool: pool ) end |
.reload ⇒ Object
Reload the global connection pool
139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/waterdrop/connection_pool.rb', line 139 def reload return unless @default_pool @default_pool.reload # Emit global event for pool reload WaterDrop.instrumentation.instrument( "connection_pool.reload", pool: @default_pool ) end |
.setup(size: 5, timeout: 5000, &producer_config) {|config, index| ... } ⇒ ConnectionPool
Sets up a global connection pool
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/waterdrop/connection_pool.rb', line 71 def setup(size: 5, timeout: 5000, &producer_config) ensure_connection_pool_gem! @default_pool = new(size: size, timeout: timeout, &producer_config) # Emit global event for pool setup WaterDrop.instrumentation.instrument( "connection_pool.setup", pool: @default_pool, size: size, timeout: timeout ) @default_pool end |
.shutdown(force: false) ⇒ Object
Shutdown the global connection pool
119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/waterdrop/connection_pool.rb', line 119 def shutdown(force: false) return unless @default_pool pool = @default_pool @default_pool.shutdown(force: force) @default_pool = nil # Emit global event for pool shutdown WaterDrop.instrumentation.instrument( "connection_pool.shutdown", pool: pool ) end |
.stats ⇒ Hash?
Get statistics about the global pool
106 107 108 109 110 111 112 113 |
# File 'lib/waterdrop/connection_pool.rb', line 106 def stats return nil unless @default_pool { size: @default_pool.size, available: @default_pool.available } end |
.transaction {|producer| ... } ⇒ Object
Execute a transaction with a producer from the global connection pool Only available when connection pool is configured
170 171 172 173 174 |
# File 'lib/waterdrop/connection_pool.rb', line 170 def transaction(...) raise "No global connection pool configured. Call setup first." unless @default_pool @default_pool.transaction(...) end |
.with {|producer| ... } ⇒ Object
Executes a block with a producer from the global pool
97 98 99 100 101 |
# File 'lib/waterdrop/connection_pool.rb', line 97 def with(...) raise "No global connection pool configured. Call setup first." unless @default_pool @default_pool.with(...) end |
Instance Method Details
#reload ⇒ Object
Producers are always closed gracefully (never force-closed): a reload must not drop in-flight messages, so it waits for them to flush rather than purging the queue.
Reload all connections in the pool. Useful for configuration changes or error recovery
271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/waterdrop/connection_pool.rb', line 271 def reload @pool.reload do |producer| producer.close if producer&.status&.active? end # Emit event after pool is reloaded WaterDrop.instrumentation.instrument( "connection_pool.reloaded", pool: self ) end |
#shutdown(force: false) ⇒ Object Also known as: close
Shutdown the connection pool
248 249 250 251 252 253 254 255 256 257 258 259 260 |
# File 'lib/waterdrop/connection_pool.rb', line 248 def shutdown(force: false) @pool.shutdown do |producer| next unless producer&.status&.active? force ? producer.close! : producer.close end # Emit event after pool is shut down WaterDrop.instrumentation.instrument( "connection_pool.shutdown", pool: self ) end |
#stats ⇒ Hash
Get pool statistics
235 236 237 238 239 240 |
# File 'lib/waterdrop/connection_pool.rb', line 235 def stats { size: @pool.size, available: @pool.available } end |
#transaction {|producer| ... } ⇒ Object
Execute a transaction with a producer from this connection pool
293 294 295 296 297 298 299 |
# File 'lib/waterdrop/connection_pool.rb', line 293 def transaction with do |producer| producer.transaction do yield(producer) end end end |