Class: Shoryuken::Processor
- Inherits:
-
Object
- Object
- Shoryuken::Processor
- Includes:
- Util
- Defined in:
- lib/shoryuken/processor.rb
Overview
Processes SQS messages by invoking the appropriate worker. Handles middleware chain execution and exception handling.
Instance Attribute Summary collapse
-
#queue ⇒ String
readonly
The queue name.
-
#sqs_msg ⇒ Aws::SQS::Types::Message+
readonly
The message or batch.
Class Method Summary collapse
-
.process(queue, sqs_msg) ⇒ Object
Processes a message from a queue.
Instance Method Summary collapse
-
#initialize(queue, sqs_msg) ⇒ Processor
constructor
Initializes a new Processor.
-
#process ⇒ Object
Processes the message through the middleware chain and worker.
Methods included from Util
#elapsed, #fire_event, #logger, #unparse_queues, #worker_name
Constructor Details
#initialize(queue, sqs_msg) ⇒ Processor
Initializes a new Processor
28 29 30 31 |
# File 'lib/shoryuken/processor.rb', line 28 def initialize(queue, sqs_msg) @queue = queue @sqs_msg = sqs_msg end |
Instance Attribute Details
#queue ⇒ String (readonly)
Returns the queue name.
10 11 12 |
# File 'lib/shoryuken/processor.rb', line 10 def queue @queue end |
#sqs_msg ⇒ Aws::SQS::Types::Message+ (readonly)
Returns the message or batch.
13 14 15 |
# File 'lib/shoryuken/processor.rb', line 13 def sqs_msg @sqs_msg end |
Class Method Details
.process(queue, sqs_msg) ⇒ Object
Processes a message from a queue
20 21 22 |
# File 'lib/shoryuken/processor.rb', line 20 def self.process(queue, sqs_msg) new(queue, sqs_msg).process end |
Instance Method Details
#process ⇒ Object
Processes the message through the middleware chain and worker
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/shoryuken/processor.rb', line 36 def process worker_perform = proc do return logger.error { "No worker found for #{queue}" } unless worker Shoryuken::Logging.with_context("#{worker_name(worker.class, sqs_msg, body)}/#{queue}/#{sqs_msg.}") do worker.class.server_middleware.invoke(worker, queue, sqs_msg, body) do worker.perform(sqs_msg, body) end end end if Shoryuken.enable_reloading Shoryuken.reloader.call do worker_perform.call end else worker_perform.call end rescue Exception => e Array(Shoryuken.exception_handlers).each { |handler| handler.call(e, queue, sqs_msg) } raise end |