Class: Legion::Transport::Helpers::ChannelPool

Inherits:
Object
  • Object
show all
Defined in:
lib/legion/transport/helpers/channel_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(connection:, size: 10, prefetch: 2) ⇒ ChannelPool

Returns a new instance of ChannelPool.



7
8
9
10
11
12
13
14
# File 'lib/legion/transport/helpers/channel_pool.rb', line 7

def initialize(connection:, size: 10, prefetch: 2)
  @connection = connection
  @size       = size
  @prefetch   = prefetch
  @available  = []
  @in_use     = []
  @mutex      = Mutex.new
end

Instance Method Details

#borrowObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/legion/transport/helpers/channel_pool.rb', line 16

def borrow
  @mutex.synchronize do
    purge_closed_unsafe

    if (ch = @available.pop)
      @in_use << ch
      Legion::Logging.debug "ChannelPool borrow reused (available=#{@available.size} in_use=#{@in_use.size})" if defined?(Legion::Logging)
      return ch
    end

    total = @available.size + @in_use.size
    return nil if total >= @size

    ch = @connection.create_channel
    ch.prefetch(@prefetch) if ch.respond_to?(:prefetch)
    @in_use << ch
    Legion::Logging.debug "ChannelPool borrow new channel (available=#{@available.size} in_use=#{@in_use.size})" if defined?(Legion::Logging)
    ch
  end
end

#close_allObject



52
53
54
55
56
57
58
59
60
61
62
# File 'lib/legion/transport/helpers/channel_pool.rb', line 52

def close_all
  @mutex.synchronize do
    total = @available.size + @in_use.size
    (@available + @in_use).each do |ch|
      ch.close rescue nil # rubocop:disable Style/RescueModifier
    end
    @available.clear
    @in_use.clear
    Legion::Logging.info "ChannelPool closed #{total} channel(s)" if defined?(Legion::Logging)
  end
end

#purge_closedObject



48
49
50
# File 'lib/legion/transport/helpers/channel_pool.rb', line 48

def purge_closed
  @mutex.synchronize { purge_closed_unsafe }
end

#return(channel) ⇒ Object



37
38
39
40
41
42
43
44
45
46
# File 'lib/legion/transport/helpers/channel_pool.rb', line 37

def return(channel)
  @mutex.synchronize do
    @in_use.delete(channel)
    return unless channel.respond_to?(:open?) && channel.open?
    return if (@available.size + @in_use.size) >= @size

    @available << channel
    Legion::Logging.debug "ChannelPool return (available=#{@available.size} in_use=#{@in_use.size})" if defined?(Legion::Logging)
  end
end

#sizeObject



64
65
66
# File 'lib/legion/transport/helpers/channel_pool.rb', line 64

def size
  @mutex.synchronize { @available.size + @in_use.size }
end