Class: ActivePublisher::Async::RedisAdapter::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/active_publisher/async/redis_adapter/consumer.rb

Constant Summary collapse

SUPERVISOR_INTERVAL =
{
  :execution_interval => 10, # seconds
  :timeout_interval => 5, # seconds
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis_pool) ⇒ Consumer

Returns a new instance of Consumer.



14
15
16
17
18
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 14

def initialize(redis_pool)
  @queue = ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue.new(redis_pool, ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY)
  @consumers = {}
  create_and_supervise_consumers!
end

Instance Attribute Details

#consumersObject (readonly)

Returns the value of attribute consumers.



12
13
14
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 12

def consumers
  @consumers
end

#queueObject (readonly)

Returns the value of attribute queue.



12
13
14
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 12

def queue
  @queue
end

#supervisorObject (readonly)

Returns the value of attribute supervisor.



12
13
14
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 12

def supervisor
  @supervisor
end

Instance Method Details

#create_and_supervise_consumers!Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 20

def create_and_supervise_consumers!
  ::ActivePublisher.configuration.publisher_threads.times do
    consumer_id = ::SecureRandom.uuid
    consumers[consumer_id] = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)

    supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do
      consumer = consumers[consumer_id]
      unless consumer.alive?
        consumer.kill rescue nil
        consumers[consumer_id] = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)
        ::ActiveSupport::Notifications.instrument "async_queue.thread_restart"
      end

      # Notify the current queue size.
      ::ActiveSupport::Notifications.instrument "redis_async_queue_size.active_publisher", queue.size
    end

    supervisor_task.execute
  end
end

#sizeObject



41
42
43
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 41

def size
  queue.size
end