Class: ActivePublisher::Async::RedisAdapter::Adapter

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/active_publisher/async/redis_adapter.rb

Constant Summary collapse

SUPERVISOR_INTERVAL =
{
  :execution_interval => 1.5, # seconds
  :timeout_interval => 1, # seconds
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

initialize_logger, logger, #logger, logger=

Constructor Details

#initialize(new_redis_pool) ⇒ Adapter

Returns a new instance of Adapter.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/active_publisher/async/redis_adapter.rb', line 24

def initialize(new_redis_pool)
  logger.info "Starting redis publisher adapter"
  # do something with supervision ?
  @redis_pool = new_redis_pool
  @async_queue = ::ActivePublisher::Async::RedisAdapter::Consumer.new(redis_pool)
  @queue = ::MultiOpQueue::Queue.new
  @flush_max = ::ActivePublisher.configuration.messages_per_batch
  @flush_min = @flush_max / 2

  supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do
    queue_size = queue.size
    number_of_times = [queue_size / 50, 1].max # get the max number of times to flush
    number_of_times = [number_of_times, 5].min # don't allow it to be more than 5 per run
    number_of_times.times { flush_queue! }
  end

  supervisor_task.execute
end

Instance Attribute Details

#async_queueObject (readonly)

Returns the value of attribute async_queue.



22
23
24
# File 'lib/active_publisher/async/redis_adapter.rb', line 22

def async_queue
  @async_queue
end

#flush_maxObject (readonly)

Returns the value of attribute flush_max.



22
23
24
# File 'lib/active_publisher/async/redis_adapter.rb', line 22

def flush_max
  @flush_max
end

#flush_minObject (readonly)

Returns the value of attribute flush_min.



22
23
24
# File 'lib/active_publisher/async/redis_adapter.rb', line 22

def flush_min
  @flush_min
end

#queueObject (readonly)

Returns the value of attribute queue.



22
23
24
# File 'lib/active_publisher/async/redis_adapter.rb', line 22

def queue
  @queue
end

#redis_poolObject (readonly)

Returns the value of attribute redis_pool.



22
23
24
# File 'lib/active_publisher/async/redis_adapter.rb', line 22

def redis_pool
  @redis_pool
end

Instance Method Details

#publish(route, payload, exchange_name, options = {}) ⇒ Object



43
44
45
46
47
48
49
# File 'lib/active_publisher/async/redis_adapter.rb', line 43

def publish(route, payload, exchange_name, options = {})
  message = ::ActivePublisher::Message.new(route, payload, exchange_name, options)
  queue << ::Marshal.dump(message)
  flush_queue! if queue.size >= flush_min || options[:flush_queue]

  nil
end

#shutdown!Object



51
52
53
54
55
56
57
# File 'lib/active_publisher/async/redis_adapter.rb', line 51

def shutdown!
  logger.info "Draining async publisher redis adapter before shutdown."
  flush_queue! until queue.empty?
  # Sleeping 2.1 seconds because the most common redis `fsync` command in AOF mode is run every 1 second
  # this will give at least 1 full `fsync` to run before the process dies
  sleep 2.1
end