Class: Legion::Transport::Helpers::ChannelPool

Inherits:
Object
  • Object
show all
Includes:
Logging::Helper
Defined in:
lib/legion/transport/helpers/channel_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(connection:, size: 10, prefetch: 2) ⇒ ChannelPool

Returns a new instance of ChannelPool.



11
12
13
14
15
16
17
18
# File 'lib/legion/transport/helpers/channel_pool.rb', line 11

def initialize(connection:, size: 10, prefetch: 2)
  @connection = connection
  @size       = size
  @prefetch   = prefetch
  @available  = []
  @in_use     = []
  @mutex      = Mutex.new
end

Instance Method Details

#borrowObject



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/legion/transport/helpers/channel_pool.rb', line 20

def borrow
  @mutex.synchronize do
    purge_closed_unsafe

    if (ch = @available.pop)
      @in_use << ch
      log.debug "ChannelPool borrow reused (available=#{@available.size} in_use=#{@in_use.size})"
      return ch
    end

    total = @available.size + @in_use.size
    return nil if total >= @size

    ch = @connection.create_channel
    ch.prefetch(@prefetch) if ch.respond_to?(:prefetch)
    @in_use << ch
    log.debug "ChannelPool borrow new channel (available=#{@available.size} in_use=#{@in_use.size})"
    ch
  end
end

#close_allObject



56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/legion/transport/helpers/channel_pool.rb', line 56

def close_all
  @mutex.synchronize do
    total = @available.size + @in_use.size
    (@available + @in_use).each do |ch|
      ch.close
    rescue StandardError => e
      handle_exception(e, level: :warn, handled: true, operation: 'transport.channel_pool.close_all', size: @size)
    end
    @available.clear
    @in_use.clear
    log.info "ChannelPool closed #{total} channel(s)"
  end
end

#purge_closedObject



52
53
54
# File 'lib/legion/transport/helpers/channel_pool.rb', line 52

def purge_closed
  @mutex.synchronize { purge_closed_unsafe }
end

#return(channel) ⇒ Object



41
42
43
44
45
46
47
48
49
50
# File 'lib/legion/transport/helpers/channel_pool.rb', line 41

def return(channel)
  @mutex.synchronize do
    @in_use.delete(channel)
    return unless channel.respond_to?(:open?) && channel.open?
    return if (@available.size + @in_use.size) >= @size

    @available << channel
    log.debug "ChannelPool return (available=#{@available.size} in_use=#{@in_use.size})"
  end
end

#sizeObject



70
71
72
# File 'lib/legion/transport/helpers/channel_pool.rb', line 70

def size
  @mutex.synchronize { @available.size + @in_use.size }
end