Class: ActivePublisher::Async::InMemoryAdapter::AsyncQueue
- Inherits:
-
Object
- Object
- ActivePublisher::Async::InMemoryAdapter::AsyncQueue
- Includes:
- Logging
- Defined in:
- lib/active_publisher/async/in_memory_adapter/async_queue.rb
Constant Summary collapse
- BACK_PRESSURE_STRATEGIES =
These strategies are used to determine what to do with messages when the queue is full. :raise - Raise an error and drop the message. :drop - Silently drop the message. :wait - Wait for space in the queue to become available.
[:raise, :drop, :wait].freeze
Instance Attribute Summary collapse
-
#back_pressure_strategy ⇒ Object
Returns the value of attribute back_pressure_strategy.
-
#consumers ⇒ Object
readonly
Returns the value of attribute consumers.
-
#max_queue_size ⇒ Object
Returns the value of attribute max_queue_size.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#supervisor ⇒ Object
readonly
Returns the value of attribute supervisor.
-
#supervisor_interval ⇒ Object
Returns the value of attribute supervisor_interval.
Instance Method Summary collapse
-
#initialize(back_pressure_strategy, max_queue_size, supervisor_interval) ⇒ AsyncQueue
constructor
A new instance of AsyncQueue.
- #push(message) ⇒ Object
- #size ⇒ Object
Methods included from Logging
initialize_logger, logger, #logger, logger=
Constructor Details
#initialize(back_pressure_strategy, max_queue_size, supervisor_interval) ⇒ AsyncQueue
Returns a new instance of AsyncQueue.
19 20 21 22 23 24 25 26 |
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 19 def initialize(back_pressure_strategy, max_queue_size, supervisor_interval) self.back_pressure_strategy = back_pressure_strategy @max_queue_size = max_queue_size @supervisor_interval = supervisor_interval @queue = ::MultiOpQueue::Queue.new @consumers = {} create_and_supervise_consumers! end |
Instance Attribute Details
#back_pressure_strategy ⇒ Object
Returns the value of attribute back_pressure_strategy.
13 14 15 |
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 13 def back_pressure_strategy @back_pressure_strategy end |
#consumers ⇒ Object (readonly)
Returns the value of attribute consumers.
17 18 19 |
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 17 def consumers @consumers end |
#max_queue_size ⇒ Object
Returns the value of attribute max_queue_size.
13 14 15 |
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 13 def max_queue_size @max_queue_size end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
17 18 19 |
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 17 def queue @queue end |
#supervisor ⇒ Object (readonly)
Returns the value of attribute supervisor.
17 18 19 |
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 17 def supervisor @supervisor end |
#supervisor_interval ⇒ Object
Returns the value of attribute supervisor_interval.
13 14 15 |
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 13 def supervisor_interval @supervisor_interval end |
Instance Method Details
#push(message) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 33 def push() max_payload_bytes_check() if ::ActivePublisher.configuration.max_payload_bytes if queue.size >= max_queue_size case back_pressure_strategy when :drop ::ActiveSupport::Notifications.instrument "message_dropped.active_publisher" return when :raise ::ActiveSupport::Notifications.instrument "message_dropped.active_publisher" fail ::ActivePublisher::Async::InMemoryAdapter::UnableToPersistMessageError, "Queue is full, messages will be dropped." when :wait ::ActiveSupport::Notifications.instrument "wait_for_async_queue.active_publisher" do # This is a really crappy way to wait sleep 0.01 until queue.size < max_queue_size end end end queue.push() end |
#size ⇒ Object
55 56 57 58 59 60 |
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 55 def size # Requests might be in flight (out of the queue, but not yet published), so taking the max should be # good enough to make sure we're honest about the actual queue size. return queue.size if consumers.empty? [queue.size, consumer_sampled_queue_size].max end |