Class: Legion::Transport::Helpers::ChannelPool
- Inherits:
-
Object
- Object
- Legion::Transport::Helpers::ChannelPool
- Includes:
- Logging::Helper
- Defined in:
- lib/legion/transport/helpers/channel_pool.rb
Instance Method Summary collapse
- #borrow ⇒ Object
- #close_all ⇒ Object
-
#initialize(connection:, size: 10, prefetch: 2) ⇒ ChannelPool
constructor
A new instance of ChannelPool.
- #purge_closed ⇒ Object
- #return(channel) ⇒ Object
- #size ⇒ Object
Constructor Details
#initialize(connection:, size: 10, prefetch: 2) ⇒ ChannelPool
Returns a new instance of ChannelPool.
11 12 13 14 15 16 17 18 |
# File 'lib/legion/transport/helpers/channel_pool.rb', line 11 def initialize(connection:, size: 10, prefetch: 2) @connection = connection @size = size @prefetch = prefetch @available = [] @in_use = [] @mutex = Mutex.new end |
Instance Method Details
#borrow ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/legion/transport/helpers/channel_pool.rb', line 20 def borrow @mutex.synchronize do purge_closed_unsafe if (ch = @available.pop) @in_use << ch log.debug "ChannelPool borrow reused (available=#{@available.size} in_use=#{@in_use.size})" return ch end total = @available.size + @in_use.size return nil if total >= @size ch = @connection.create_channel ch.prefetch(@prefetch) if ch.respond_to?(:prefetch) @in_use << ch log.debug "ChannelPool borrow new channel (available=#{@available.size} in_use=#{@in_use.size})" ch end end |
#close_all ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/legion/transport/helpers/channel_pool.rb', line 56 def close_all @mutex.synchronize do total = @available.size + @in_use.size (@available + @in_use).each do |ch| ch.close rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: 'transport.channel_pool.close_all', size: @size) end @available.clear @in_use.clear log.info "ChannelPool closed #{total} channel(s)" end end |
#purge_closed ⇒ Object
52 53 54 |
# File 'lib/legion/transport/helpers/channel_pool.rb', line 52 def purge_closed @mutex.synchronize { purge_closed_unsafe } end |
#return(channel) ⇒ Object
41 42 43 44 45 46 47 48 49 50 |
# File 'lib/legion/transport/helpers/channel_pool.rb', line 41 def return(channel) @mutex.synchronize do @in_use.delete(channel) return unless channel.respond_to?(:open?) && channel.open? return if (@available.size + @in_use.size) >= @size @available << channel log.debug "ChannelPool return (available=#{@available.size} in_use=#{@in_use.size})" end end |
#size ⇒ Object
70 71 72 |
# File 'lib/legion/transport/helpers/channel_pool.rb', line 70 def size @mutex.synchronize { @available.size + @in_use.size } end |