Class: Lepus::ConnectionPool
- Inherits:
-
Object
- Object
- Lepus::ConnectionPool
- Defined in:
- lib/lepus/connection_pool.rb
Overview
Connection pool for managing Bunny connections efficiently Similar to the connection_pool gem but using concurrent-ruby primitives
Constant Summary collapse
- DEFAULT_SIZE =
5- DEFAULT_TIMEOUT =
5.0
Instance Attribute Summary collapse
-
#conn_suffix ⇒ Object
readonly
Returns the value of attribute conn_suffix.
-
#pool_size ⇒ Object
readonly
Returns the value of attribute pool_size.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
Instance Method Summary collapse
- #available? ⇒ Boolean
- #available_count ⇒ Object
- #checkin(connection) ⇒ Object
- #checkout ⇒ Object
- #in_use_count ⇒ Object
-
#initialize(size: DEFAULT_SIZE, timeout: DEFAULT_TIMEOUT, suffix: nil) ⇒ ConnectionPool
constructor
A new instance of ConnectionPool.
- #shutdown ⇒ Object
- #size ⇒ Object
- #with_connection ⇒ Object
Constructor Details
#initialize(size: DEFAULT_SIZE, timeout: DEFAULT_TIMEOUT, suffix: nil) ⇒ ConnectionPool
Returns a new instance of ConnectionPool.
14 15 16 17 18 19 20 21 22 23 |
# File 'lib/lepus/connection_pool.rb', line 14 def initialize(size: DEFAULT_SIZE, timeout: DEFAULT_TIMEOUT, suffix: nil) @pool_size = size @timeout = timeout @conn_suffix = suffix @available = Concurrent::Array.new @in_use = Concurrent::Array.new @semaphore = Concurrent::Semaphore.new(pool_size) @mutex = Concurrent::ReadWriteLock.new @shutdown = Concurrent::AtomicBoolean.new(false) end |
Instance Attribute Details
#conn_suffix ⇒ Object (readonly)
Returns the value of attribute conn_suffix.
12 13 14 |
# File 'lib/lepus/connection_pool.rb', line 12 def conn_suffix @conn_suffix end |
#pool_size ⇒ Object (readonly)
Returns the value of attribute pool_size.
12 13 14 |
# File 'lib/lepus/connection_pool.rb', line 12 def pool_size @pool_size end |
#timeout ⇒ Object (readonly)
Returns the value of attribute timeout.
12 13 14 |
# File 'lib/lepus/connection_pool.rb', line 12 def timeout @timeout end |
Instance Method Details
#available? ⇒ Boolean
107 108 109 |
# File 'lib/lepus/connection_pool.rb', line 107 def available? !@shutdown.value end |
#available_count ⇒ Object
117 118 119 120 121 |
# File 'lib/lepus/connection_pool.rb', line 117 def available_count @mutex.with_read_lock do @available.length end end |
#checkin(connection) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/lepus/connection_pool.rb', line 75 def checkin(connection) return unless connection @mutex.with_write_lock do @in_use.delete(connection) if connection.connected? && !@shutdown.value @available << connection else begin connection.close rescue nil end end end @semaphore.release end |
#checkout ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/lepus/connection_pool.rb', line 36 def checkout raise Lepus::ConnectionPoolError, "Connection pool is shut down" if @shutdown.value # Try to acquire a permit with timeout start_time = Time.now acquired = false while Time.now - start_time < timeout if @semaphore.try_acquire acquired = true break end sleep(0.01) # Small sleep to avoid busy waiting end unless acquired raise Concurrent::TimeoutError, "Connection pool timeout" end @mutex.with_read_lock do # Try to reuse an existing connection connection = @available.shift if connection&.connected? @in_use << connection return connection end end # Create a new connection connection = Lepus.config.create_connection(suffix: @conn_suffix) @mutex.with_write_lock do @in_use << connection end connection rescue => e @semaphore.release raise e end |
#in_use_count ⇒ Object
123 124 125 126 127 |
# File 'lib/lepus/connection_pool.rb', line 123 def in_use_count @mutex.with_read_lock do @in_use.length end end |
#shutdown ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/lepus/connection_pool.rb', line 93 def shutdown @shutdown.make_true @mutex.with_write_lock do (@available + @in_use).each do |connection| connection.close rescue nil end @available.clear @in_use.clear end end |
#size ⇒ Object
111 112 113 114 115 |
# File 'lib/lepus/connection_pool.rb', line 111 def size @mutex.with_read_lock do @available.length + @in_use.length end end |
#with_connection ⇒ Object
25 26 27 28 29 30 31 32 33 34 |
# File 'lib/lepus/connection_pool.rb', line 25 def with_connection connection = checkout begin yield connection ensure checkin(connection) end rescue Concurrent::TimeoutError raise Lepus::ConnectionPoolTimeoutError, "Connection pool timeout after #{timeout} seconds" end |