Class: Legion::Transport::Helpers::ChannelPool
- Inherits:
-
Object
- Object
- Legion::Transport::Helpers::ChannelPool
- Defined in:
- lib/legion/transport/helpers/channel_pool.rb
Instance Method Summary collapse
- #borrow ⇒ Object
- #close_all ⇒ Object
-
#initialize(connection:, size: 10, prefetch: 2) ⇒ ChannelPool
constructor
A new instance of ChannelPool.
- #purge_closed ⇒ Object
- #return(channel) ⇒ Object
- #size ⇒ Object
Constructor Details
#initialize(connection:, size: 10, prefetch: 2) ⇒ ChannelPool
Returns a new instance of ChannelPool.
7 8 9 10 11 12 13 14 |
# File 'lib/legion/transport/helpers/channel_pool.rb', line 7 def initialize(connection:, size: 10, prefetch: 2) @connection = connection @size = size @prefetch = prefetch @available = [] @in_use = [] @mutex = Mutex.new end |
Instance Method Details
#borrow ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/legion/transport/helpers/channel_pool.rb', line 16 def borrow @mutex.synchronize do purge_closed_unsafe if (ch = @available.pop) @in_use << ch Legion::Logging.debug "ChannelPool borrow reused (available=#{@available.size} in_use=#{@in_use.size})" if defined?(Legion::Logging) return ch end total = @available.size + @in_use.size return nil if total >= @size ch = @connection.create_channel ch.prefetch(@prefetch) if ch.respond_to?(:prefetch) @in_use << ch Legion::Logging.debug "ChannelPool borrow new channel (available=#{@available.size} in_use=#{@in_use.size})" if defined?(Legion::Logging) ch end end |
#close_all ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/legion/transport/helpers/channel_pool.rb', line 52 def close_all @mutex.synchronize do total = @available.size + @in_use.size (@available + @in_use).each do |ch| ch.close rescue StandardError => e Legion::Logging.debug("ChannelPool#close_all channel close failed: #{e.}") if defined?(Legion::Logging) end @available.clear @in_use.clear Legion::Logging.info "ChannelPool closed #{total} channel(s)" if defined?(Legion::Logging) end end |
#purge_closed ⇒ Object
48 49 50 |
# File 'lib/legion/transport/helpers/channel_pool.rb', line 48 def purge_closed @mutex.synchronize { purge_closed_unsafe } end |
#return(channel) ⇒ Object
37 38 39 40 41 42 43 44 45 46 |
# File 'lib/legion/transport/helpers/channel_pool.rb', line 37 def return(channel) @mutex.synchronize do @in_use.delete(channel) return unless channel.respond_to?(:open?) && channel.open? return if (@available.size + @in_use.size) >= @size @available << channel Legion::Logging.debug "ChannelPool return (available=#{@available.size} in_use=#{@in_use.size})" if defined?(Legion::Logging) end end |
#size ⇒ Object
66 67 68 |
# File 'lib/legion/transport/helpers/channel_pool.rb', line 66 def size @mutex.synchronize { @available.size + @in_use.size } end |