Class: ActivePublisher::Async::RedisAdapter::Consumer
- Inherits:
-
Object
- Object
- ActivePublisher::Async::RedisAdapter::Consumer
- Defined in:
- lib/active_publisher/async/redis_adapter/consumer.rb
Constant Summary collapse
- SUPERVISOR_INTERVAL =
{ :execution_interval => 10, # seconds :timeout_interval => 5, # seconds }
Instance Attribute Summary collapse
-
#consumers ⇒ Object
readonly
Returns the value of attribute consumers.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#supervisor ⇒ Object
readonly
Returns the value of attribute supervisor.
Instance Method Summary collapse
- #create_and_supervise_consumers! ⇒ Object
-
#initialize(redis_pool) ⇒ Consumer
constructor
A new instance of Consumer.
- #size ⇒ Object
Constructor Details
#initialize(redis_pool) ⇒ Consumer
Returns a new instance of Consumer.
14 15 16 17 18 |
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 14 def initialize(redis_pool) @queue = ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue.new(redis_pool, ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY) @consumers = {} create_and_supervise_consumers! end |
Instance Attribute Details
#consumers ⇒ Object (readonly)
Returns the value of attribute consumers.
12 13 14 |
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 12 def consumers @consumers end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
12 13 14 |
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 12 def queue @queue end |
#supervisor ⇒ Object (readonly)
Returns the value of attribute supervisor.
12 13 14 |
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 12 def supervisor @supervisor end |
Instance Method Details
#create_and_supervise_consumers! ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 20 def create_and_supervise_consumers! ::ActivePublisher.configuration.publisher_threads.times do consumer_id = ::SecureRandom.uuid consumers[consumer_id] = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue) supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do consumer = consumers[consumer_id] unless consumer.alive? consumer.kill rescue nil consumers[consumer_id] = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue) ::ActiveSupport::Notifications.instrument "async_queue.thread_restart" end # Notify the current queue size. ::ActiveSupport::Notifications.instrument "redis_async_queue_size.active_publisher", queue.size end supervisor_task.execute end end |
#size ⇒ Object
41 42 43 |
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 41 def size queue.size end |