Class: Shoryuken::Polling::WeightedRoundRobin

Inherits:
BaseStrategy
  • Object
show all
Defined in:
lib/shoryuken/polling/weighted_round_robin.rb

Overview

A polling strategy that processes queues in weighted round-robin order. Queue weights determine how often each queue is polled relative to others. Queues are temporarily paused when no messages are found.

Instance Method Summary collapse

Methods inherited from BaseStrategy

#==, #delay

Methods included from Util

#elapsed, #fire_event, #logger, #unparse_queues, #worker_name

Constructor Details

#initialize(queues, delay = nil) ⇒ WeightedRoundRobin

Initializes a new WeightedRoundRobin polling strategy

Parameters:

  • queues (Array<String>)

    array of queue names, with weights indicated by repetition

  • delay (Float, nil) (defaults to: nil)

    delay in seconds before unpausing empty queues



13
14
15
16
17
18
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 13

def initialize(queues, delay = nil)
  @initial_queues = queues
  @queues = queues.dup.uniq
  @paused_queues = []
  @delay = delay
end

Instance Method Details

#active_queuesArray<Array>

Returns the list of active queues with their current weights

Returns:

  • (Array<Array>)

    array of [queue_name, weight] pairs



54
55
56
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 54

def active_queues
  unparse_queues(@queues)
end

#message_processed(queue) ⇒ void

This method returns an undefined value.

Called when a message from a queue has been processed

Parameters:

  • queue (String)

    the queue name



62
63
64
65
66
67
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 62

def message_processed(queue)
  paused_queue = @paused_queues.find { |_time, name| name == queue }
  return unless paused_queue

  paused_queue[0] = Time.at 0
end

#messages_found(queue, messages_found) ⇒ void

This method returns an undefined value.

Handles the result of polling a queue, adjusting weight if messages were found

Parameters:

  • queue (String)

    the queue name

  • messages_found (Integer)

    number of messages found



37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 37

def messages_found(queue, messages_found)
  if messages_found == 0
    pause(queue)
    return
  end

  maximum_weight = maximum_queue_weight(queue)
  current_weight = current_queue_weight(queue)
  if maximum_weight > current_weight
    logger.info { "Increasing #{queue} weight to #{current_weight + 1}, max: #{maximum_weight}" }
    @queues << queue
  end
end

#next_queueQueueConfiguration?

Returns the next queue to poll in round-robin order

Returns:



23
24
25
26
27
28
29
30
# File 'lib/shoryuken/polling/weighted_round_robin.rb', line 23

def next_queue
  unpause_queues
  queue = @queues.shift
  return nil if queue.nil?

  @queues << queue
  QueueConfiguration.new(queue, {})
end