Class: Shoryuken::Queue

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/shoryuken/queue.rb

Overview

Represents an SQS queue and provides methods for sending and receiving messages. Handles both standard and FIFO queues, automatically adding required FIFO attributes.

Constant Summary collapse

FIFO_ATTR =

SQS attribute name for FIFO queue identification

'FifoQueue'
MESSAGE_GROUP_ID =

Default message group ID used for FIFO queues

'ShoryukenMessage'
VISIBILITY_TIMEOUT_ATTR =

SQS attribute name for visibility timeout

'VisibilityTimeout'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util

#elapsed, #fire_event, #logger, #unparse_queues, #worker_name

Constructor Details

#initialize(client, name_or_url_or_arn) ⇒ Queue

Initializes a new Queue instance

Parameters:

  • client (Aws::SQS::Client)

    the SQS client

  • name_or_url_or_arn (String)

    the queue name, URL, or ARN



31
32
33
34
# File 'lib/shoryuken/queue.rb', line 31

def initialize(client, name_or_url_or_arn)
  self.client = client
  set_name_and_url(name_or_url_or_arn)
end

Instance Attribute Details

#clientAws::SQS::Client

Returns the SQS client.

Returns:

  • (Aws::SQS::Client)

    the SQS client



22
23
24
# File 'lib/shoryuken/queue.rb', line 22

def client
  @client
end

#nameString

Returns the queue name.

Returns:

  • (String)

    the queue name



19
20
21
# File 'lib/shoryuken/queue.rb', line 19

def name
  @name
end

#urlString

Returns the queue URL.

Returns:

  • (String)

    the queue URL



25
26
27
# File 'lib/shoryuken/queue.rb', line 25

def url
  @url
end

Instance Method Details

#delete_messages(options) ⇒ Boolean

Deletes messages from the queue in batch

Parameters:

  • options (Hash)

    options for delete_message_batch

Options Hash (options):

  • :entries (Array<Hash>)

    entries to delete with id and receipt_handle

Returns:

  • (Boolean)

    true if any messages failed to delete



51
52
53
54
55
56
57
58
59
60
# File 'lib/shoryuken/queue.rb', line 51

def delete_messages(options)
  failed_messages = client.delete_message_batch(
    options.merge(queue_url: url)
  ).failed || []
  failed_messages.any? do |failure|
    logger.error do
      "Could not delete #{failure.id}, code: '#{failure.code}', message: '#{failure.message}', sender_fault: #{failure.sender_fault}"
    end
  end
end

#fifo?Boolean

Checks if the queue is a FIFO queue

Returns:

  • (Boolean)

    true if the queue is a FIFO queue



105
106
107
108
109
110
111
112
# File 'lib/shoryuken/queue.rb', line 105

def fifo?
  # Make sure the memoization work with boolean to avoid multiple calls to SQS
  # see https://github.com/ruby-shoryuken/shoryuken/pull/529
  return @_fifo if defined?(@_fifo)

  @_fifo = queue_attributes.attributes[FIFO_ATTR] == 'true'
  @_fifo
end

#receive_messages(options) ⇒ Array<Shoryuken::Message>

Receives messages from the queue

Parameters:

  • options (Hash)

    options for receive_message

Options Hash (options):

  • :max_number_of_messages (Integer)

    maximum messages to receive

  • :visibility_timeout (Integer)

    visibility timeout for received messages

  • :wait_time_seconds (Integer)

    long polling wait time

  • :attribute_names (Array<String>)

    SQS attributes to retrieve

  • :message_attribute_names (Array<String>)

    message attributes to retrieve

Returns:



97
98
99
100
# File 'lib/shoryuken/queue.rb', line 97

def receive_messages(options)
  messages = client.receive_message(options.merge(queue_url: url)).messages || []
  messages.map { |m| Message.new(client, self, m) }
end

#send_message(options) ⇒ Aws::SQS::Types::SendMessageResult

Sends a single message to the queue

Parameters:

  • options (Hash, String)

    message options or message body string

Options Hash (options):

  • :message_body (String)

    the message body

  • :delay_seconds (Integer)

    delay before message becomes visible

  • :message_attributes (Hash)

    custom message attributes

  • :message_group_id (String)

    FIFO queue message group ID

  • :message_deduplication_id (String)

    FIFO queue deduplication ID

Returns:

  • (Aws::SQS::Types::SendMessageResult)

    the send result



71
72
73
74
75
76
77
# File 'lib/shoryuken/queue.rb', line 71

def send_message(options)
  options = sanitize_message!(options).merge(queue_url: url)

  Shoryuken.client_middleware.invoke(options) do
    client.send_message(options)
  end
end

#send_messages(options) ⇒ Aws::SQS::Types::SendMessageBatchResult

Sends multiple messages to the queue in batch

Parameters:

  • options (Hash, Array)

    batch options or array of message bodies/hashes

Options Hash (options):

  • :entries (Array<Hash>)

    message entries to send

Returns:

  • (Aws::SQS::Types::SendMessageBatchResult)

    the batch send result



84
85
86
# File 'lib/shoryuken/queue.rb', line 84

def send_messages(options)
  client.send_message_batch(sanitize_messages!(options).merge(queue_url: url))
end

#visibility_timeoutInteger

Returns the visibility timeout for the queue

Returns:

  • (Integer)

    the visibility timeout in seconds



39
40
41
42
43
44
# File 'lib/shoryuken/queue.rb', line 39

def visibility_timeout
  # Always lookup for the latest visibility when cache is disabled
  # setting it to nil, forces re-lookup
  @_visibility_timeout = nil unless Shoryuken.cache_visibility_timeout?
  @_visibility_timeout ||= queue_attributes.attributes[VISIBILITY_TIMEOUT_ATTR].to_i
end