Class: Shoryuken::Polling::WeightedRoundRobin
- Inherits:
-
BaseStrategy
- Object
- BaseStrategy
- Shoryuken::Polling::WeightedRoundRobin
- Defined in:
- lib/shoryuken/polling/weighted_round_robin.rb
Overview
A polling strategy that processes queues in weighted round-robin order. Queue weights determine how often each queue is polled relative to others. Queues are temporarily paused when no messages are found.
Instance Method Summary collapse
-
#active_queues ⇒ Array<Array>
Returns the list of active queues with their current weights.
-
#initialize(queues, delay = nil) ⇒ WeightedRoundRobin
constructor
Initializes a new WeightedRoundRobin polling strategy.
-
#message_processed(queue) ⇒ void
Called when a message from a queue has been processed.
-
#messages_found(queue, messages_found) ⇒ void
Handles the result of polling a queue, adjusting weight if messages were found.
-
#next_queue ⇒ QueueConfiguration?
Returns the next queue to poll in round-robin order.
Methods inherited from BaseStrategy
Methods included from Util
#elapsed, #fire_event, #logger, #unparse_queues, #worker_name
Constructor Details
#initialize(queues, delay = nil) ⇒ WeightedRoundRobin
Initializes a new WeightedRoundRobin polling strategy
13 14 15 16 17 18 |
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 13 def initialize(queues, delay = nil) @initial_queues = queues @queues = queues.dup.uniq @paused_queues = [] @delay = delay end |
Instance Method Details
#active_queues ⇒ Array<Array>
Returns the list of active queues with their current weights
54 55 56 |
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 54 def active_queues unparse_queues(@queues) end |
#message_processed(queue) ⇒ void
This method returns an undefined value.
Called when a message from a queue has been processed
62 63 64 65 66 67 |
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 62 def (queue) paused_queue = @paused_queues.find { |_time, name| name == queue } return unless paused_queue paused_queue[0] = Time.at 0 end |
#messages_found(queue, messages_found) ⇒ void
This method returns an undefined value.
Handles the result of polling a queue, adjusting weight if messages were found
37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 37 def (queue, ) if == 0 pause(queue) return end maximum_weight = maximum_queue_weight(queue) current_weight = current_queue_weight(queue) if maximum_weight > current_weight logger.info { "Increasing #{queue} weight to #{current_weight + 1}, max: #{maximum_weight}" } @queues << queue end end |
#next_queue ⇒ QueueConfiguration?
Returns the next queue to poll in round-robin order
23 24 25 26 27 28 29 30 |
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 23 def next_queue unpause_queues queue = @queues.shift return nil if queue.nil? @queues << queue QueueConfiguration.new(queue, {}) end |