Class: ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis_connection_pool, new_list_key) ⇒ RedisMultiPopQueue

Returns a new instance of RedisMultiPopQueue.



7
8
9
10
# File 'lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb', line 7

def initialize(redis_connection_pool, new_list_key)
  @redis_pool = redis_connection_pool
  @list_key = new_list_key
end

Instance Attribute Details

#list_keyObject (readonly)

Returns the value of attribute list_key.



5
6
7
# File 'lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb', line 5

def list_key
  @list_key
end

#redis_poolObject (readonly)

Returns the value of attribute redis_pool.



5
6
7
# File 'lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb', line 5

def redis_pool
  @redis_pool
end

Instance Method Details

#<<(message) ⇒ Object



12
13
14
15
16
17
18
# File 'lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb', line 12

def <<(message)
  encoded_message = ::Marshal.dump(message)

  redis_pool.with do |redis|
    redis.rpush(list_key, encoded_message)
  end
end

#concat(*messages) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb', line 20

def concat(*messages)
  messages = messages.flatten
  messages.compact!
  return if messages.empty?

  encoded_messages = []
  messages.each do |message|
    encoded_messages << ::Marshal.dump(message)
  end

  redis_pool.with do |redis|
    redis.rpush(list_key, encoded_messages)
  end
end

#empty?Boolean

Returns:

  • (Boolean)


35
36
37
# File 'lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb', line 35

def empty?
  size <= 0
end

#pop_up_to(num_to_pop, opts = {}) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb', line 39

def pop_up_to(num_to_pop, opts = {})
  case opts
  when TrueClass, FalseClass
    non_bock = opts
  when Hash
    timeout = opts.fetch(:timeout, nil)
    non_block = opts.fetch(:non_block, false)
  end

  queue_size = size
  if queue_size <= 0
    if non_block
      raise ThreadError, "queue empty"
    else
      total_waited_time = 0

      loop do
        total_waited_time += 0.1
        sleep 0.1
        queue_size = size

        if queue_size > 0
          num_to_pop = [num_to_pop, queue_size].min # make sure we don't pop more than size
          return shift(num_to_pop)
        end

        return if timeout && total_waited_time > timeout
      end
    end
  else
    shift(num_to_pop)
  end
end

#shift(number) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb', line 73

def shift(number)
  number = [number, size].min
  return [] if number <= 0

  messages = []
  multi_response = []
  redis_pool.with do |redis|
    multi_response = redis.multi do |pipeline|
      pipeline.lrange(list_key, 0, number - 1)
      pipeline.ltrim(list_key, number, -1)
    end
  end

  messages = multi_response.first
  success = multi_response.last
  return [] if multi_response.size != 2 || success.nil?
  return [] unless success =~ /ok/i

  messages = [] if messages.nil?
  messages = [messages] unless messages.respond_to?(:each)

  shifted_messages = []
  messages.each do |message|
    next if message.nil?

    shifted_messages << ::Marshal.load(message)
  end

  shifted_messages
end

#sizeObject



104
105
106
107
108
# File 'lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb', line 104

def size
  redis_pool.with do |redis|
    redis.llen(list_key) || 0
  end
end