Class: Shoryuken::Polling::StrictPriority

Inherits:
BaseStrategy show all
Defined in:
lib/shoryuken/polling/strict_priority.rb

Overview

A polling strategy that processes queues in strict priority order. Higher priority queues are always processed before lower priority queues. Queues are temporarily paused when no messages are found.

Instance Method Summary collapse

Methods inherited from BaseStrategy

#==, #delay

Methods included from Util

#elapsed, #fire_event, #logger, #unparse_queues, #worker_name

Constructor Details

#initialize(queues, delay = nil) ⇒ StrictPriority

Initializes a new StrictPriority polling strategy

Parameters:

  • queues (Array<String>)

    array of queue names, with higher priority queues appearing more frequently

  • delay (Float, nil) (defaults to: nil)

    delay in seconds before unpausing empty queues



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/shoryuken/polling/strict_priority.rb', line 13

def initialize(queues, delay = nil)
  # Priority ordering of the queues, highest priority first
  @queues = queues
            .group_by { |q| q }
            .sort_by { |_, qs| -qs.count }
            .map(&:first)

  # Pause status of the queues, default to past time (unpaused)
  @paused_until = queues
                  .each_with_object({}) { |queue, h| h[queue] = Time.at(0) }

  @delay = delay
  # Start queues at 0
  reset_next_queue
end

Instance Method Details

#active_queuesArray<Array>

Returns the list of active (non-paused) queues with their priorities

Returns:

  • (Array<Array>)

    array of [queue_name, priority] pairs



53
54
55
56
57
58
59
# File 'lib/shoryuken/polling/strict_priority.rb', line 53

def active_queues
  @queues
    .reverse
    .map.with_index(1)
    .reject { |q, _| queue_paused?(q) }
    .reverse
end

#message_processed(queue) ⇒ void

This method returns an undefined value.

Called when a message from a queue has been processed

Parameters:

  • queue (String)

    the queue name



65
66
67
68
69
70
# File 'lib/shoryuken/polling/strict_priority.rb', line 65

def message_processed(queue)
  if queue_paused?(queue)
    logger.debug "Unpausing #{queue}"
    @paused_until[queue] = Time.at 0
  end
end

#messages_found(queue, messages_found) ⇒ void

This method returns an undefined value.

Handles the result of polling a queue

Parameters:

  • queue (String)

    the queue name

  • messages_found (Integer)

    number of messages found



42
43
44
45
46
47
48
# File 'lib/shoryuken/polling/strict_priority.rb', line 42

def messages_found(queue, messages_found)
  if messages_found == 0
    pause(queue)
  else
    reset_next_queue
  end
end

#next_queueQueueConfiguration?

Returns the next queue to poll based on strict priority

Returns:



32
33
34
35
# File 'lib/shoryuken/polling/strict_priority.rb', line 32

def next_queue
  next_queue = next_active_queue
  next_queue.nil? ? nil : QueueConfiguration.new(next_queue, {})
end