Class: Shoryuken::Polling::StrictPriority
- Inherits:
-
BaseStrategy
- Object
- BaseStrategy
- Shoryuken::Polling::StrictPriority
- Defined in:
- lib/shoryuken/polling/strict_priority.rb
Overview
A polling strategy that processes queues in strict priority order. Higher priority queues are always processed before lower priority queues. Queues are temporarily paused when no messages are found.
Instance Method Summary collapse
-
#active_queues ⇒ Array<Array>
Returns the list of active (non-paused) queues with their priorities.
-
#initialize(queues, delay = nil) ⇒ StrictPriority
constructor
Initializes a new StrictPriority polling strategy.
-
#message_processed(queue) ⇒ void
Called when a message from a queue has been processed.
-
#messages_found(queue, messages_found) ⇒ void
Handles the result of polling a queue.
-
#next_queue ⇒ QueueConfiguration?
Returns the next queue to poll based on strict priority.
Methods inherited from BaseStrategy
Methods included from Util
#elapsed, #fire_event, #logger, #unparse_queues, #worker_name
Constructor Details
#initialize(queues, delay = nil) ⇒ StrictPriority
Initializes a new StrictPriority polling strategy
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/shoryuken/polling/strict_priority.rb', line 13 def initialize(queues, delay = nil) # Priority ordering of the queues, highest priority first @queues = queues .group_by { |q| q } .sort_by { |_, qs| -qs.count } .map(&:first) # Pause status of the queues, default to past time (unpaused) @paused_until = queues .each_with_object({}) { |queue, h| h[queue] = Time.at(0) } @delay = delay # Start queues at 0 reset_next_queue end |
Instance Method Details
#active_queues ⇒ Array<Array>
Returns the list of active (non-paused) queues with their priorities
53 54 55 56 57 58 59 |
# File 'lib/shoryuken/polling/strict_priority.rb', line 53 def active_queues @queues .reverse .map.with_index(1) .reject { |q, _| queue_paused?(q) } .reverse end |
#message_processed(queue) ⇒ void
This method returns an undefined value.
Called when a message from a queue has been processed
65 66 67 68 69 70 |
# File 'lib/shoryuken/polling/strict_priority.rb', line 65 def (queue) if queue_paused?(queue) logger.debug "Unpausing #{queue}" @paused_until[queue] = Time.at 0 end end |
#messages_found(queue, messages_found) ⇒ void
This method returns an undefined value.
Handles the result of polling a queue
42 43 44 45 46 47 48 |
# File 'lib/shoryuken/polling/strict_priority.rb', line 42 def (queue, ) if == 0 pause(queue) else reset_next_queue end end |
#next_queue ⇒ QueueConfiguration?
Returns the next queue to poll based on strict priority
32 33 34 35 |
# File 'lib/shoryuken/polling/strict_priority.rb', line 32 def next_queue next_queue = next_active_queue next_queue.nil? ? nil : QueueConfiguration.new(next_queue, {}) end |