Class: Shoryuken::Processor

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/shoryuken/processor.rb

Overview

Processes SQS messages by invoking the appropriate worker. Handles middleware chain execution and exception handling.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util

#elapsed, #fire_event, #logger, #unparse_queues, #worker_name

Constructor Details

#initialize(queue, sqs_msg) ⇒ Processor

Initializes a new Processor

Parameters:

  • queue (String)

    the queue name

  • sqs_msg (Aws::SQS::Types::Message, Array<Aws::SQS::Types::Message>)

    the message or batch



28
29
30
31
# File 'lib/shoryuken/processor.rb', line 28

def initialize(queue, sqs_msg)
  @queue   = queue
  @sqs_msg = sqs_msg
end

Instance Attribute Details

#queueString (readonly)

Returns the queue name.

Returns:

  • (String)

    the queue name



10
11
12
# File 'lib/shoryuken/processor.rb', line 10

def queue
  @queue
end

#sqs_msgAws::SQS::Types::Message+ (readonly)

Returns the message or batch.

Returns:

  • (Aws::SQS::Types::Message, Array<Aws::SQS::Types::Message>)

    the message or batch



13
14
15
# File 'lib/shoryuken/processor.rb', line 13

def sqs_msg
  @sqs_msg
end

Class Method Details

.process(queue, sqs_msg) ⇒ Object

Processes a message from a queue

Parameters:

  • queue (String)

    the queue name

  • sqs_msg (Aws::SQS::Types::Message, Array<Aws::SQS::Types::Message>)

    the message or batch

Returns:

  • (Object)

    the result of the worker’s perform method



20
21
22
# File 'lib/shoryuken/processor.rb', line 20

def self.process(queue, sqs_msg)
  new(queue, sqs_msg).process
end

Instance Method Details

#processObject

Processes the message through the middleware chain and worker

Returns:

  • (Object)

    the result of the worker’s perform method



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/shoryuken/processor.rb', line 36

def process
  worker_perform = proc do
    return logger.error { "No worker found for #{queue}" } unless worker

    Shoryuken::Logging.with_context("#{worker_name(worker.class, sqs_msg, body)}/#{queue}/#{sqs_msg.message_id}") do
      worker.class.server_middleware.invoke(worker, queue, sqs_msg, body) do
        worker.perform(sqs_msg, body)
      end
    end
  end

  if Shoryuken.enable_reloading
    Shoryuken.reloader.call do
      worker_perform.call
    end
  else
    worker_perform.call
  end
rescue Exception => e
  Array(Shoryuken.exception_handlers).each { |handler| handler.call(e, queue, sqs_msg) }

  raise
end