Class: ActivePublisher::Async::RedisAdapter::Adapter
- Inherits:
-
Object
- Object
- ActivePublisher::Async::RedisAdapter::Adapter
- Includes:
- Logging
- Defined in:
- lib/active_publisher/async/redis_adapter.rb
Constant Summary collapse
- SUPERVISOR_INTERVAL =
{ :execution_interval => 1.5, # seconds :timeout_interval => 1, # seconds }
Instance Attribute Summary collapse
-
#async_queue ⇒ Object
readonly
Returns the value of attribute async_queue.
-
#flush_max ⇒ Object
readonly
Returns the value of attribute flush_max.
-
#flush_min ⇒ Object
readonly
Returns the value of attribute flush_min.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#redis_pool ⇒ Object
readonly
Returns the value of attribute redis_pool.
Instance Method Summary collapse
-
#initialize(new_redis_pool) ⇒ Adapter
constructor
A new instance of Adapter.
- #publish(route, payload, exchange_name, options = {}) ⇒ Object
- #shutdown! ⇒ Object
Methods included from Logging
initialize_logger, logger, #logger, logger=
Constructor Details
#initialize(new_redis_pool) ⇒ Adapter
Returns a new instance of Adapter.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/active_publisher/async/redis_adapter.rb', line 24 def initialize(new_redis_pool) logger.info "Starting redis publisher adapter" # do something with supervision ? @redis_pool = new_redis_pool @async_queue = ::ActivePublisher::Async::RedisAdapter::Consumer.new(redis_pool) @queue = ::MultiOpQueue::Queue.new @flush_max = ::ActivePublisher.configuration. @flush_min = @flush_max / 2 supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do queue_size = queue.size number_of_times = [queue_size / 50, 1].max # get the max number of times to flush number_of_times = [number_of_times, 5].min # don't allow it to be more than 5 per run number_of_times.times { flush_queue! } end supervisor_task.execute end |
Instance Attribute Details
#async_queue ⇒ Object (readonly)
Returns the value of attribute async_queue.
22 23 24 |
# File 'lib/active_publisher/async/redis_adapter.rb', line 22 def async_queue @async_queue end |
#flush_max ⇒ Object (readonly)
Returns the value of attribute flush_max.
22 23 24 |
# File 'lib/active_publisher/async/redis_adapter.rb', line 22 def flush_max @flush_max end |
#flush_min ⇒ Object (readonly)
Returns the value of attribute flush_min.
22 23 24 |
# File 'lib/active_publisher/async/redis_adapter.rb', line 22 def flush_min @flush_min end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
22 23 24 |
# File 'lib/active_publisher/async/redis_adapter.rb', line 22 def queue @queue end |
#redis_pool ⇒ Object (readonly)
Returns the value of attribute redis_pool.
22 23 24 |
# File 'lib/active_publisher/async/redis_adapter.rb', line 22 def redis_pool @redis_pool end |
Instance Method Details
#publish(route, payload, exchange_name, options = {}) ⇒ Object
43 44 45 46 47 48 49 |
# File 'lib/active_publisher/async/redis_adapter.rb', line 43 def publish(route, payload, exchange_name, = {}) = ::ActivePublisher::Message.new(route, payload, exchange_name, ) queue << ::Marshal.dump() flush_queue! if queue.size >= flush_min || [:flush_queue] nil end |
#shutdown! ⇒ Object
51 52 53 54 55 56 57 |
# File 'lib/active_publisher/async/redis_adapter.rb', line 51 def shutdown! logger.info "Draining async publisher redis adapter before shutdown." flush_queue! until queue.empty? # Sleeping 2.1 seconds because the most common redis `fsync` command in AOF mode is run every 1 second # this will give at least 1 full `fsync` to run before the process dies sleep 2.1 end |