Class: Lepus::Consumer Abstract

Inherits:
Object
  • Object
show all
Includes:
Web::ConsumerExtensions
Defined in:
lib/lepus/consumer.rb

Overview

This class is abstract.

Subclass and override #work to implement.

The abstract base class for consumers processing messages from queues.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.abstract_class=(value) ⇒ Object



14
15
16
17
# File 'lib/lepus/consumer.rb', line 14

def abstract_class=(value)
  @config = nil
  @abstract_class = value
end

.abstract_class?Boolean

Returns:

  • (Boolean)


8
9
10
11
12
# File 'lib/lepus/consumer.rb', line 8

def abstract_class?
  return @abstract_class == true if defined?(@abstract_class)

  instance_variable_get(:@config).nil?
end

.configObject



24
25
26
27
28
29
30
# File 'lib/lepus/consumer.rb', line 24

def config
  return if @abstract_class == true
  return @config if defined?(@config)

  name = Primitive::String.new(to_s).underscore.split("/").last
  @config = Consumers::Config.new(queue: name, exchange: name)
end

.configure(opts = {}) {|@config| ... } ⇒ Object

Configures the consumer, setting queue, exchange and other options to be used by the add_consumer method.

Parameters:

  • opts (Hash) (defaults to: {})

    The options to configure the consumer with.

Options Hash (opts):

  • :queue (String, Hash)

    The name of the queue to consume from.

  • :exchange (String, Hash)

    The name of the exchange the queue should be bound to.

  • :routing_key (Array)

    The routing keys used for the queue binding.

  • :retry_queue (Boolean, Hash) — default: false

    Whether a retry queue should be provided.

  • :error_queue (Boolean, Hash) — default: false

    Whether an error queue should be provided.

Yields:

Raises:

  • (ArgumentError)


56
57
58
59
60
61
62
# File 'lib/lepus/consumer.rb', line 56

def configure(opts = {})
  raise ArgumentError, "Cannot configure an abstract class" if abstract_class?

  @config = Consumers::Config.new(opts)
  yield(@config) if block_given?
  @config
end

.descendantsObject

:nodoc:



64
65
66
67
68
69
70
# File 'lib/lepus/consumer.rb', line 64

def descendants # :nodoc:
  descendants = []
  ObjectSpace.each_object(singleton_class) do |k|
    descendants.unshift k unless k == self
  end
  descendants.uniq
end

.inherited(subclass) ⇒ Object



19
20
21
22
# File 'lib/lepus/consumer.rb', line 19

def inherited(subclass)
  super
  subclass.abstract_class = false
end

.middleware_chainLepus::Consumers::MiddlewareChain

Returns the middleware chain for this consumer.



34
35
36
# File 'lib/lepus/consumer.rb', line 34

def middleware_chain
  @middleware_chain ||= Consumers::MiddlewareChain.new
end

.use(middleware, opts = {}) ⇒ Lepus::Consumers::MiddlewareChain

Registers a middleware to this consumer’s chain.

Parameters:

  • middleware (Symbol, String, Class<Lepus::Middleware>)

    The middleware to register.

  • opts (Hash) (defaults to: {})

    Options passed to the middleware constructor.

Returns:



43
44
45
# File 'lib/lepus/consumer.rb', line 43

def use(middleware, opts = {})
  middleware_chain.use(middleware, opts)
end

Instance Method Details

#last_delivery_errored?Boolean

Returns whether the last delivery resulted in an error. Always false in core; overridden by Lepus::Web when loaded.

Returns:

  • (Boolean)


117
118
119
# File 'lib/lepus/consumer.rb', line 117

def last_delivery_errored?
  false
end

#perform(message) ⇒ :ack, ...

The method that is called when a message from the queue is received. Keep in mind that the parameters received can be altered by middlewares!

Parameters:

  • message (Leupus::Message)

    The message to process.

Returns:

  • (:ack, :reject, :requeue)

    A symbol denoting what should be done with the message.

Raises:

  • (NotImplementedError)


79
80
81
# File 'lib/lepus/consumer.rb', line 79

def perform(message)
  raise NotImplementedError
end

#process_delivery(delivery_info, metadata, payload) ⇒ Object

Wraps #perform to add middlewares. This is being called by Lepus when a message is received for the consumer.

Parameters:

  • delivery_info (Bunny::DeliveryInfo)

    The delivery info of the received message.

  • metadata (Bunny::MessageProperties)

    The metadata of the received message.

  • payload (String)

    The payload of the received message.

Raises:



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/lepus/consumer.rb', line 89

def process_delivery(delivery_info, , payload)
  message = Message.coerce(delivery_info, , payload)
  message.consumer_class = self.class

  combined_chain = MiddlewareChain.combine(
    Lepus.config.consumer_middleware_chain,
    self.class.middleware_chain
  )

  combined_chain.execute(message) do |msg|
    perform(msg).tap do |result|
      verify_result(result)
    end
  end
rescue Lepus::InvalidConsumerReturnError
  raise
rescue Exception # rubocop:disable Lint/RescueException
  on_delivery_error
  # In testing mode, re-raise exceptions if consumer_raise_errors? is enabled
  if defined?(Lepus::Testing) && Lepus::Testing.consumer_raise_errors?
    raise
  end

  reject!
end