Class: ActivePublisher::Async::InMemoryAdapter::AsyncQueue

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/active_publisher/async/in_memory_adapter/async_queue.rb

Constant Summary collapse

BACK_PRESSURE_STRATEGIES =

These strategies are used to determine what to do with messages when the queue is full. :raise - Raise an error and drop the message. :drop - Silently drop the message. :wait - Wait for space in the queue to become available.

[:raise, :drop, :wait].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

initialize_logger, logger, #logger, logger=

Constructor Details

#initialize(back_pressure_strategy, max_queue_size, supervisor_interval) ⇒ AsyncQueue

Returns a new instance of AsyncQueue.



19
20
21
22
23
24
25
26
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 19

def initialize(back_pressure_strategy, max_queue_size, supervisor_interval)
  self.back_pressure_strategy = back_pressure_strategy
  @max_queue_size = max_queue_size
  @supervisor_interval = supervisor_interval
  @queue = ::MultiOpQueue::Queue.new
  @consumers = {}
  create_and_supervise_consumers!
end

Instance Attribute Details

#back_pressure_strategyObject

Returns the value of attribute back_pressure_strategy.



13
14
15
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 13

def back_pressure_strategy
  @back_pressure_strategy
end

#consumersObject (readonly)

Returns the value of attribute consumers.



17
18
19
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 17

def consumers
  @consumers
end

#max_queue_sizeObject

Returns the value of attribute max_queue_size.



13
14
15
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 13

def max_queue_size
  @max_queue_size
end

#queueObject (readonly)

Returns the value of attribute queue.



17
18
19
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 17

def queue
  @queue
end

#supervisorObject (readonly)

Returns the value of attribute supervisor.



17
18
19
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 17

def supervisor
  @supervisor
end

#supervisor_intervalObject

Returns the value of attribute supervisor_interval.



13
14
15
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 13

def supervisor_interval
  @supervisor_interval
end

Instance Method Details

#push(message) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 33

def push(message)
  if queue.size >= max_queue_size
    case back_pressure_strategy
    when :drop
      ::ActiveSupport::Notifications.instrument "message_dropped.active_publisher"
      return
    when :raise
      ::ActiveSupport::Notifications.instrument "message_dropped.active_publisher"
      fail ::ActivePublisher::Async::InMemoryAdapter::UnableToPersistMessageError, "Queue is full, messages will be dropped."
    when :wait
      ::ActiveSupport::Notifications.instrument "wait_for_async_queue.active_publisher" do
        # This is a really crappy way to wait
        sleep 0.01 until queue.size < max_queue_size
      end
    end
  end

  queue.push(message)
end

#sizeObject



53
54
55
56
57
58
# File 'lib/active_publisher/async/in_memory_adapter/async_queue.rb', line 53

def size
  # Requests might be in flight (out of the queue, but not yet published), so taking the max should be
  # good enough to make sure we're honest about the actual queue size.
  return queue.size if consumers.empty?
  [queue.size, consumer_sampled_queue_size].max
end