Class: ActivePublisher::Async::InMemoryAdapter::Adapter
- Inherits:
-
Object
- Object
- ActivePublisher::Async::InMemoryAdapter::Adapter
- Includes:
- Logging
- Defined in:
- lib/active_publisher/async/in_memory_adapter.rb
Instance Attribute Summary collapse
-
#async_queue ⇒ Object
readonly
Returns the value of attribute async_queue.
Instance Method Summary collapse
-
#initialize(back_pressure_strategy = :raise, max_queue_size = 100_000, supervisor_interval = 0.2) ⇒ Adapter
constructor
A new instance of Adapter.
- #publish(route, payload, exchange_name, options = {}) ⇒ Object
- #shutdown! ⇒ Object
Methods included from Logging
initialize_logger, logger, #logger, logger=
Constructor Details
#initialize(back_pressure_strategy = :raise, max_queue_size = 100_000, supervisor_interval = 0.2) ⇒ Adapter
Returns a new instance of Adapter.
24 25 26 27 28 29 30 31 32 |
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 24 def initialize(back_pressure_strategy = :raise, max_queue_size = 100_000, supervisor_interval = 0.2) logger.info "Starting in-memory publisher adapter" @async_queue = ::ActivePublisher::Async::InMemoryAdapter::AsyncQueue.new( back_pressure_strategy, max_queue_size, supervisor_interval ) end |
Instance Attribute Details
#async_queue ⇒ Object (readonly)
Returns the value of attribute async_queue.
22 23 24 |
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 22 def async_queue @async_queue end |
Instance Method Details
#publish(route, payload, exchange_name, options = {}) ⇒ Object
34 35 36 37 38 |
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 34 def publish(route, payload, exchange_name, = {}) = ::ActivePublisher::Message.new(route, payload, exchange_name, ) async_queue.push() nil end |
#shutdown! ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/active_publisher/async/in_memory_adapter.rb', line 40 def shutdown! max_wait_time = ::ActivePublisher.configuration.seconds_to_wait_for_graceful_shutdown started_shutting_down_at = ::Time.now logger.info "Draining async publisher in-memory adapter queue before shutdown. current queue size: #{async_queue.size}." while async_queue.size > 0 if (::Time.now - started_shutting_down_at) > max_wait_time logger.info "Forcing async publisher adapter shutdown because graceful shutdown period of #{max_wait_time} seconds was exceeded. Current queue size: #{async_queue.size}." break end sleep 0.1 end end |