Class: Lepus::Producer Abstract

Inherits:
Object
  • Object
show all
Defined in:
lib/lepus/producer.rb

Overview

This class is abstract.

Subclass and override #configure to implement.

The abstract base class for producers publishing messages to exchanges.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeProducer

Instance methods for when you need to work with producer instances



139
140
141
# File 'lib/lepus/producer.rb', line 139

def initialize
  @definition = self.class.definition
end

Instance Attribute Details

#definitionObject (readonly)

Returns the value of attribute definition.



143
144
145
# File 'lib/lepus/producer.rb', line 143

def definition
  @definition
end

Class Method Details

.abstract_class=(value) ⇒ Object



14
15
16
17
# File 'lib/lepus/producer.rb', line 14

def abstract_class=(value)
  @abstract_class = value
  remove_instance_variable(:@definition) if instance_variable_defined?(:@definition)
end

.abstract_class?Boolean

Returns:

  • (Boolean)


8
9
10
11
12
# File 'lib/lepus/producer.rb', line 8

def abstract_class?
  return @abstract_class == true if defined?(@abstract_class)

  instance_variable_get(:@definition).nil?
end

.configure(opts = {}) {|definition| ... } ⇒ Lepus::Producers::Definition

Configures the producer, setting exchange and other options to be used by the publisher for sending messages.

Parameters:

  • opts (Hash) (defaults to: {})

    The options to configure the producer with.

Options Hash (opts):

  • :exchange (String, Hash)

    The name of the exchange to publish to.

  • :publish (Hash)

    Default publish options (persistent, mandatory, immediate).

Yields:

  • (definition)

    Optional block to further configure the producer.

Yield Parameters:

Returns:

Raises:

  • (ArgumentError)


41
42
43
44
45
46
47
# File 'lib/lepus/producer.rb', line 41

def configure(opts = {})
  raise ArgumentError, "Cannot configure an abstract class" if abstract_class?

  @definition = Producers::Definition.new(opts)
  yield(@definition) if block_given?
  @definition
end

.definitionObject



24
25
26
27
28
29
30
# File 'lib/lepus/producer.rb', line 24

def definition
  return if abstract_class?
  return @definition if defined?(@definition)

  name = Primitive::String.new(to_s).underscore.split("/").last
  @definition = Producers::Definition.new(exchange: name)
end

.descendantsObject

:nodoc:



49
50
51
52
53
54
55
# File 'lib/lepus/producer.rb', line 49

def descendants # :nodoc:
  descendants = []
  ObjectSpace.each_object(singleton_class) do |k|
    descendants.unshift k unless k == self
  end
  descendants.uniq
end

.inherited(subclass) ⇒ Object



19
20
21
22
# File 'lib/lepus/producer.rb', line 19

def inherited(subclass)
  super
  subclass.abstract_class = false
end

.middleware_chainLepus::Producers::MiddlewareChain

Returns the middleware chain for this producer.



65
66
67
# File 'lib/lepus/producer.rb', line 65

def middleware_chain
  @middleware_chain ||= Producers::MiddlewareChain.new
end

.publish(payload, **options) ⇒ void

This method returns an undefined value.

Publishes a message using this producer’s configuration. Executes the middleware chain (global + per-producer) before publishing.

Parameters:

  • payload (String, Hash)

    The message payload to publish.

  • options (Hash)

    Additional publish options (routing_key, headers, etc.).



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/lepus/producer.rb', line 84

def publish(payload, **options)
  if definition.nil?
    raise InvalidProducerConfigError, <<~ERROR
      The #{name} producer is not configured.
      Please call #{name}.configure before using #{self.class.name}.publish.
    ERROR
  end

  return unless Producers.enabled?(self)

  publish_opts = definition.publish_options.merge(options)
  message = build_message(payload, publish_opts)
  combined_chain = MiddlewareChain.combine(
    Lepus.config.producer_middleware_chain,
    middleware_chain
  )

  combined_chain.execute(message) do |msg|
    publisher.publish(msg.payload, **msg.to_publish_options)
  end
end

.publisherLepus::Publisher

Creates a publisher instance configured with this producer’s settings.

Returns:



59
60
61
# File 'lib/lepus/producer.rb', line 59

def publisher
  @publisher ||= Publisher.new(definition.exchange_name, **definition.exchange_options)
end

.use(middleware, opts = {}) ⇒ Lepus::Producers::MiddlewareChain

Registers a middleware to this producer’s chain.

Parameters:

  • middleware (Symbol, String, Class<Lepus::Middleware>)

    The middleware to register.

  • opts (Hash) (defaults to: {})

    Options passed to the middleware constructor.

Returns:



74
75
76
# File 'lib/lepus/producer.rb', line 74

def use(middleware, opts = {})
  middleware_chain.use(middleware, opts)
end

Instance Method Details

#publish(message, **options) ⇒ Object



149
150
151
# File 'lib/lepus/producer.rb', line 149

def publish(message, **options)
  self.class.publish(message, **options)
end

#publisherObject



145
146
147
# File 'lib/lepus/producer.rb', line 145

def publisher
  @publisher ||= Publisher.new(definition.exchange_name, **definition.exchange_options)
end