Class: Lepus::Consumer Abstract
- Inherits:
-
Object
- Object
- Lepus::Consumer
- Includes:
- Web::ConsumerExtensions
- Defined in:
- lib/lepus/consumer.rb
Overview
Subclass and override #work to implement.
The abstract base class for consumers processing messages from queues.
Class Method Summary collapse
- .abstract_class=(value) ⇒ Object
- .abstract_class? ⇒ Boolean
- .config ⇒ Object
-
.configure(opts = {}) {|@config| ... } ⇒ Object
Configures the consumer, setting queue, exchange and other options to be used by the add_consumer method.
-
.descendants ⇒ Object
:nodoc:.
- .inherited(subclass) ⇒ Object
-
.middleware_chain ⇒ Lepus::Consumers::MiddlewareChain
Returns the middleware chain for this consumer.
-
.use(middleware, opts = {}) ⇒ Lepus::Consumers::MiddlewareChain
Registers a middleware to this consumer’s chain.
Instance Method Summary collapse
-
#last_delivery_errored? ⇒ Boolean
Returns whether the last delivery resulted in an error.
-
#perform(message) ⇒ :ack, ...
The method that is called when a message from the queue is received.
-
#process_delivery(delivery_info, metadata, payload) ⇒ Object
Wraps #perform to add middlewares.
Class Method Details
.abstract_class=(value) ⇒ Object
14 15 16 17 |
# File 'lib/lepus/consumer.rb', line 14 def abstract_class=(value) @config = nil @abstract_class = value end |
.abstract_class? ⇒ Boolean
8 9 10 11 12 |
# File 'lib/lepus/consumer.rb', line 8 def abstract_class? return @abstract_class == true if defined?(@abstract_class) instance_variable_get(:@config).nil? end |
.config ⇒ Object
24 25 26 27 28 29 30 |
# File 'lib/lepus/consumer.rb', line 24 def config return if @abstract_class == true return @config if defined?(@config) name = Primitive::String.new(to_s).underscore.split("/").last @config = Consumers::Config.new(queue: name, exchange: name) end |
.configure(opts = {}) {|@config| ... } ⇒ Object
Configures the consumer, setting queue, exchange and other options to be used by the add_consumer method.
64 65 66 67 68 69 70 |
# File 'lib/lepus/consumer.rb', line 64 def configure(opts = {}) raise ArgumentError, "Cannot configure an abstract class" if abstract_class? @config = Consumers::Config.new(opts) yield(@config) if block_given? @config end |
.descendants ⇒ Object
:nodoc:
72 73 74 75 76 77 78 |
# File 'lib/lepus/consumer.rb', line 72 def descendants # :nodoc: descendants = [] ObjectSpace.each_object(singleton_class) do |k| descendants.unshift k unless k == self end descendants.uniq end |
.inherited(subclass) ⇒ Object
19 20 21 22 |
# File 'lib/lepus/consumer.rb', line 19 def inherited(subclass) super subclass.abstract_class = false end |
.middleware_chain ⇒ Lepus::Consumers::MiddlewareChain
Returns the middleware chain for this consumer. Inherits middlewares registered on superclasses so abstract base consumers can declare shared middlewares with ‘use` and have them apply to subclasses.
36 37 38 39 40 41 42 43 44 |
# File 'lib/lepus/consumer.rb', line 36 def middleware_chain @middleware_chain ||= begin chain = Consumers::MiddlewareChain.new if superclass.respond_to?(:middleware_chain) superclass.middleware_chain.middlewares.each { |m| chain.middlewares << m } end chain end end |
.use(middleware, opts = {}) ⇒ Lepus::Consumers::MiddlewareChain
Registers a middleware to this consumer’s chain.
51 52 53 |
# File 'lib/lepus/consumer.rb', line 51 def use(middleware, opts = {}) middleware_chain.use(middleware, opts) end |
Instance Method Details
#last_delivery_errored? ⇒ Boolean
Returns whether the last delivery resulted in an error. Always false in core; overridden by Lepus::Web when loaded.
125 126 127 |
# File 'lib/lepus/consumer.rb', line 125 def last_delivery_errored? false end |
#perform(message) ⇒ :ack, ...
The method that is called when a message from the queue is received. Keep in mind that the parameters received can be altered by middlewares!
87 88 89 |
# File 'lib/lepus/consumer.rb', line 87 def perform() raise NotImplementedError end |
#process_delivery(delivery_info, metadata, payload) ⇒ Object
Wraps #perform to add middlewares. This is being called by Lepus when a message is received for the consumer.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/lepus/consumer.rb', line 97 def process_delivery(delivery_info, , payload) = Message.coerce(delivery_info, , payload) .consumer_class = self.class combined_chain = MiddlewareChain.combine( Lepus.config.consumer_middleware_chain, self.class.middleware_chain ) combined_chain.execute() do |msg| perform(msg).tap do |result| verify_result(result) end end rescue Lepus::InvalidConsumerReturnError raise rescue Exception # rubocop:disable Lint/RescueException on_delivery_error # In testing mode, re-raise exceptions if consumer_raise_errors? is enabled if defined?(Lepus::Testing) && Lepus::Testing.consumer_raise_errors? raise end reject! end |