Class: WaterDrop::ConnectionPool

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/waterdrop/connection_pool.rb

Overview

Connection pool wrapper for WaterDrop producers using the proven connection_pool gem.

This provides a clean WaterDrop-specific API while leveraging the battle-tested, connection_pool gem underneath. The wrapper hides the direct usage of the connection_pool gem and provides WaterDrop-specific configuration.

Examples:

Basic usage

pool = WaterDrop::ConnectionPool.new(size: 10) do |config|
  config.kafka = { 'bootstrap.servers': 'localhost:9092' }
  config.deliver = true
end

pool.with do |producer|
  producer.produce_sync(topic: 'events', payload: 'data')
end

Transactional producers with unique IDs

pool = WaterDrop::ConnectionPool.new(size: 5) do |config, index|
  config.kafka = {
    'bootstrap.servers': 'localhost:9092',
    'transactional.id': "my-app-#{index}"
  }
end

Global connection pool

WaterDrop::ConnectionPool.setup(size: 20) do |config|
  config.kafka = { 'bootstrap.servers': ENV['KAFKA_BROKERS'] }
end

WaterDrop::ConnectionPool.with do |producer|
  producer.produce_async(topic: 'events', payload: 'data')
end

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size: 5, timeout: 5000, &producer_config) {|config, index| ... } ⇒ ConnectionPool

Creates a new WaterDrop connection pool

Parameters:

  • size (Integer) (defaults to: 5)

    Pool size (default: 5)

  • timeout (Numeric) (defaults to: 5000)

    Connection timeout in milliseconds (default: 5000)

  • producer_config (Proc)

    Block to configure each producer in the pool

Yields:

  • (config, index)

    Block to configure each producer in the pool, receives config and pool index



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/waterdrop/connection_pool.rb', line 204

def initialize(size: 5, timeout: 5000, &producer_config)
  self.class.send(:ensure_connection_pool_gem!)

  @producer_config = producer_config
  @pool_index = 0
  @pool_mutex = Mutex.new

  @pool = ::ConnectionPool.new(size: size, timeout: timeout / 1000.0) do
    producer_index = @pool_mutex.synchronize { @pool_index += 1 }

    WaterDrop::Producer.new do |config|
      if @producer_config.arity == 2
        @producer_config.call(config, producer_index)
      else
        @producer_config.call(config)
      end
    end
  end

  # Emit event when a connection pool is created
  WaterDrop.instrumentation.instrument(
    "connection_pool.created",
    pool: self,
    size: size,
    timeout: timeout
  )
end

Class Attribute Details

.default_poolConnectionPool?

Returns the global connection pool instance.

Returns:



47
48
49
# File 'lib/waterdrop/connection_pool.rb', line 47

def default_pool
  @default_pool
end

Instance Attribute Details

#pool::ConnectionPool (readonly)

Returns the underlying connection_pool instance This allows access to advanced connection_pool features if needed

Returns:

  • (::ConnectionPool)

    The underlying connection pool



305
306
307
# File 'lib/waterdrop/connection_pool.rb', line 305

def pool
  @pool
end

Class Method Details

.active?Boolean

Check if the global connection pool is active (configured)

Returns:

  • (Boolean)

    true if global pool is configured, false otherwise



154
155
156
# File 'lib/waterdrop/connection_pool.rb', line 154

def active?
  !@default_pool.nil?
end

.closeObject

Shutdown the global connection pool Alias for shutdown to align with producer API WaterDrop producers use #close, so we alias connection pool #shutdown to #close for API consistency across both individual producers and connection pools

Parameters:

  • force (Boolean)

    when true, force-close each producer, purging unflushed messages. Defaults to false (graceful close) so in-flight messages are not silently dropped.



136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/waterdrop/connection_pool.rb', line 136

def shutdown(force: false)
  return unless @default_pool

  pool = @default_pool
  @default_pool.shutdown(force: force)
  @default_pool = nil

  # Emit global event for pool shutdown
  WaterDrop.instrumentation.instrument(
    "connection_pool.shutdown",
    pool: pool
  )
end

.reloadObject

Reload the global connection pool



139
140
141
142
143
144
145
146
147
148
149
# File 'lib/waterdrop/connection_pool.rb', line 139

def reload
  return unless @default_pool

  @default_pool.reload

  # Emit global event for pool reload
  WaterDrop.instrumentation.instrument(
    "connection_pool.reload",
    pool: @default_pool
  )
end

.setup(size: 5, timeout: 5000, &producer_config) {|config, index| ... } ⇒ ConnectionPool

Sets up a global connection pool

Examples:

Basic setup

WaterDrop::ConnectionPool.setup(size: 15) do |config|
  config.kafka = { 'bootstrap.servers': ENV['KAFKA_BROKERS'] }
  config.deliver = true
end

Transactional setup with unique IDs

WaterDrop::ConnectionPool.setup(size: 5) do |config, index|
  config.kafka = {
    'bootstrap.servers': ENV['KAFKA_BROKERS'],
    'transactional.id': "my-app-#{index}"
  }
end

Parameters:

  • size (Integer) (defaults to: 5)

    Pool size (default: 5)

  • timeout (Numeric) (defaults to: 5000)

    Connection timeout in milliseconds (default: 5000)

  • producer_config (Proc)

    Block to configure each producer in the pool

Yields:

  • (config, index)

    Block to configure each producer in the pool, receives config and pool index

Returns:



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/waterdrop/connection_pool.rb', line 71

def setup(size: 5, timeout: 5000, &producer_config)
  ensure_connection_pool_gem!

  @default_pool = new(size: size, timeout: timeout, &producer_config)

  # Emit global event for pool setup
  WaterDrop.instrumentation.instrument(
    "connection_pool.setup",
    pool: @default_pool,
    size: size,
    timeout: timeout
  )

  @default_pool
end

.shutdown(force: false) ⇒ Object

Shutdown the global connection pool

Parameters:

  • force (Boolean) (defaults to: false)

    when true, force-close each producer, purging unflushed messages. Defaults to false (graceful close) so in-flight messages are not silently dropped.



119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/waterdrop/connection_pool.rb', line 119

def shutdown(force: false)
  return unless @default_pool

  pool = @default_pool
  @default_pool.shutdown(force: force)
  @default_pool = nil

  # Emit global event for pool shutdown
  WaterDrop.instrumentation.instrument(
    "connection_pool.shutdown",
    pool: pool
  )
end

.statsHash?

Get statistics about the global pool

Returns:

  • (Hash, nil)

    Pool statistics or nil if no global pool



106
107
108
109
110
111
112
113
# File 'lib/waterdrop/connection_pool.rb', line 106

def stats
  return nil unless @default_pool

  {
    size: @default_pool.size,
    available: @default_pool.available
  }
end

.transaction {|producer| ... } ⇒ Object

Execute a transaction with a producer from the global connection pool Only available when connection pool is configured

Examples:

WaterDrop::ConnectionPool.transaction do |producer|
  producer.produce(topic: 'events', payload: 'data1')
  producer.produce(topic: 'events', payload: 'data2')
end

Yields:

  • (producer)

    Producer from the global pool with an active transaction

Returns:

  • (Object)

    Result of the block

Raises:

  • (RuntimeError)

    If no global pool is configured



170
171
172
173
174
# File 'lib/waterdrop/connection_pool.rb', line 170

def transaction(...)
  raise "No global connection pool configured. Call setup first." unless @default_pool

  @default_pool.transaction(...)
end

.with {|producer| ... } ⇒ Object

Executes a block with a producer from the global pool

Examples:

WaterDrop::ConnectionPool.with do |producer|
  producer.produce_sync(topic: 'events', payload: 'data')
end

Yields:

  • (producer)

    Producer from the global pool

Returns:

  • (Object)

    Result of the block

Raises:

  • (RuntimeError)

    If no global pool is configured



97
98
99
100
101
# File 'lib/waterdrop/connection_pool.rb', line 97

def with(...)
  raise "No global connection pool configured. Call setup first." unless @default_pool

  @default_pool.with(...)
end

Instance Method Details

#reloadObject

Note:

Producers are always closed gracefully (never force-closed): a reload must not drop in-flight messages, so it waits for them to flush rather than purging the queue.

Reload all connections in the pool. Useful for configuration changes or error recovery



271
272
273
274
275
276
277
278
279
280
281
# File 'lib/waterdrop/connection_pool.rb', line 271

def reload
  @pool.reload do |producer|
    producer.close if producer&.status&.active?
  end

  # Emit event after pool is reloaded
  WaterDrop.instrumentation.instrument(
    "connection_pool.reloaded",
    pool: self
  )
end

#shutdown(force: false) ⇒ Object Also known as: close

Shutdown the connection pool

Parameters:

  • force (Boolean) (defaults to: false)

    when true, force-close each producer, purging any messages that do not flush within the producer’s max wait timeout. Defaults to false: producers are closed gracefully so in-flight messages are flushed instead of being silently dropped when the broker is slow or unreachable.



248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/waterdrop/connection_pool.rb', line 248

def shutdown(force: false)
  @pool.shutdown do |producer|
    next unless producer&.status&.active?

    force ? producer.close! : producer.close
  end

  # Emit event after pool is shut down
  WaterDrop.instrumentation.instrument(
    "connection_pool.shutdown",
    pool: self
  )
end

#statsHash

Get pool statistics

Returns:

  • (Hash)

    Pool statistics



235
236
237
238
239
240
# File 'lib/waterdrop/connection_pool.rb', line 235

def stats
  {
    size: @pool.size,
    available: @pool.available
  }
end

#transaction {|producer| ... } ⇒ Object

Execute a transaction with a producer from this connection pool

Examples:

pool.transaction do |producer|
  producer.produce(topic: 'events', payload: 'data1')
  producer.produce(topic: 'events', payload: 'data2')
end

Yields:

  • (producer)

    Producer from the pool with an active transaction

Returns:

  • (Object)

    Result of the block



293
294
295
296
297
298
299
# File 'lib/waterdrop/connection_pool.rb', line 293

def transaction
  with do |producer|
    producer.transaction do
      yield(producer)
    end
  end
end