Class: Lepus::Consumer Abstract
- Inherits:
-
Object
- Object
- Lepus::Consumer
- Includes:
- Web::ConsumerExtensions
- Defined in:
- lib/lepus/consumer.rb
Overview
Subclass and override #work to implement.
The abstract base class for consumers processing messages from queues.
Class Method Summary collapse
- .abstract_class=(value) ⇒ Object
- .abstract_class? ⇒ Boolean
- .config ⇒ Object
-
.configure(opts = {}) {|@config| ... } ⇒ Object
Configures the consumer, setting queue, exchange and other options to be used by the add_consumer method.
-
.descendants ⇒ Object
:nodoc:.
- .inherited(subclass) ⇒ Object
-
.middleware_chain ⇒ Lepus::Consumers::MiddlewareChain
Returns the middleware chain for this consumer.
-
.use(middleware, opts = {}) ⇒ Lepus::Consumers::MiddlewareChain
Registers a middleware to this consumer’s chain.
Instance Method Summary collapse
-
#last_delivery_errored? ⇒ Boolean
Returns whether the last delivery resulted in an error.
-
#perform(message) ⇒ :ack, ...
The method that is called when a message from the queue is received.
-
#process_delivery(delivery_info, metadata, payload) ⇒ Object
Wraps #perform to add middlewares.
Class Method Details
.abstract_class=(value) ⇒ Object
14 15 16 17 |
# File 'lib/lepus/consumer.rb', line 14 def abstract_class=(value) @config = nil @abstract_class = value end |
.abstract_class? ⇒ Boolean
8 9 10 11 12 |
# File 'lib/lepus/consumer.rb', line 8 def abstract_class? return @abstract_class == true if defined?(@abstract_class) instance_variable_get(:@config).nil? end |
.config ⇒ Object
24 25 26 27 28 29 30 |
# File 'lib/lepus/consumer.rb', line 24 def config return if @abstract_class == true return @config if defined?(@config) name = Primitive::String.new(to_s).underscore.split("/").last @config = Consumers::Config.new(queue: name, exchange: name) end |
.configure(opts = {}) {|@config| ... } ⇒ Object
Configures the consumer, setting queue, exchange and other options to be used by the add_consumer method.
56 57 58 59 60 61 62 |
# File 'lib/lepus/consumer.rb', line 56 def configure(opts = {}) raise ArgumentError, "Cannot configure an abstract class" if abstract_class? @config = Consumers::Config.new(opts) yield(@config) if block_given? @config end |
.descendants ⇒ Object
:nodoc:
64 65 66 67 68 69 70 |
# File 'lib/lepus/consumer.rb', line 64 def descendants # :nodoc: descendants = [] ObjectSpace.each_object(singleton_class) do |k| descendants.unshift k unless k == self end descendants.uniq end |
.inherited(subclass) ⇒ Object
19 20 21 22 |
# File 'lib/lepus/consumer.rb', line 19 def inherited(subclass) super subclass.abstract_class = false end |
.middleware_chain ⇒ Lepus::Consumers::MiddlewareChain
Returns the middleware chain for this consumer.
34 35 36 |
# File 'lib/lepus/consumer.rb', line 34 def middleware_chain @middleware_chain ||= Consumers::MiddlewareChain.new end |
.use(middleware, opts = {}) ⇒ Lepus::Consumers::MiddlewareChain
Registers a middleware to this consumer’s chain.
43 44 45 |
# File 'lib/lepus/consumer.rb', line 43 def use(middleware, opts = {}) middleware_chain.use(middleware, opts) end |
Instance Method Details
#last_delivery_errored? ⇒ Boolean
Returns whether the last delivery resulted in an error. Always false in core; overridden by Lepus::Web when loaded.
117 118 119 |
# File 'lib/lepus/consumer.rb', line 117 def last_delivery_errored? false end |
#perform(message) ⇒ :ack, ...
The method that is called when a message from the queue is received. Keep in mind that the parameters received can be altered by middlewares!
79 80 81 |
# File 'lib/lepus/consumer.rb', line 79 def perform() raise NotImplementedError end |
#process_delivery(delivery_info, metadata, payload) ⇒ Object
Wraps #perform to add middlewares. This is being called by Lepus when a message is received for the consumer.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/lepus/consumer.rb', line 89 def process_delivery(delivery_info, , payload) = Message.coerce(delivery_info, , payload) .consumer_class = self.class combined_chain = MiddlewareChain.combine( Lepus.config.consumer_middleware_chain, self.class.middleware_chain ) combined_chain.execute() do |msg| perform(msg).tap do |result| verify_result(result) end end rescue Lepus::InvalidConsumerReturnError raise rescue Exception # rubocop:disable Lint/RescueException on_delivery_error # In testing mode, re-raise exceptions if consumer_raise_errors? is enabled if defined?(Lepus::Testing) && Lepus::Testing.consumer_raise_errors? raise end reject! end |