Class: ActivePublisher::Async::InMemoryAdapter::Adapter

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/active_publisher/async/in_memory_adapter.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

initialize_logger, logger, #logger, logger=

Constructor Details

#initialize(back_pressure_strategy = :raise, max_queue_size = 100_000, supervisor_interval = 0.2) ⇒ Adapter

Returns a new instance of Adapter.



23
24
25
26
27
28
29
30
31
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 23

def initialize(back_pressure_strategy = :raise, max_queue_size = 100_000, supervisor_interval = 0.2)
  logger.info "Starting in-memory publisher adapter"

  @async_queue = ::ActivePublisher::Async::InMemoryAdapter::AsyncQueue.new(
    back_pressure_strategy,
    max_queue_size,
    supervisor_interval
  )
end

Instance Attribute Details

#async_queueObject (readonly)

Returns the value of attribute async_queue.



21
22
23
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 21

def async_queue
  @async_queue
end

Instance Method Details

#publish(route, payload, exchange_name, options = {}) ⇒ Object



33
34
35
36
37
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 33

def publish(route, payload, exchange_name, options = {})
  message = ::ActivePublisher::Message.new(route, payload, exchange_name, options)
  async_queue.push(message)
  nil
end

#shutdown!Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 39

def shutdown!
  max_wait_time = ::ActivePublisher.configuration.seconds_to_wait_for_graceful_shutdown
  started_shutting_down_at = ::Time.now

  logger.info "Draining async publisher in-memory adapter queue before shutdown. current queue size: #{async_queue.size}."
  while async_queue.size > 0
    if (::Time.now - started_shutting_down_at) > max_wait_time
      logger.info "Forcing async publisher adapter shutdown because graceful shutdown period of #{max_wait_time} seconds was exceeded. Current queue size: #{async_queue.size}."
      break
    end

    sleep 0.1
  end
end