Class: Lepus::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/lepus/publisher.rb

Constant Summary collapse

DEFAULT_EXCHANGE_OPTIONS =
{
  type: :topic,
  durable: true,
  auto_delete: false
}.freeze
DEFAULT_PUBLISH_OPTIONS =
{
  persistent: true
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(exchange_name, **options) ⇒ void

Parameters:

  • exchange_name (String)

    The name of the exchange to publish messages to.

  • options (Hash)

    Additional options for the exchange (type, durable, auto_delete).



20
21
22
23
# File 'lib/lepus/publisher.rb', line 20

def initialize(exchange_name, **options)
  @exchange_name = exchange_name
  @exchange_options = DEFAULT_EXCHANGE_OPTIONS.merge(options)
end

Instance Method Details

#channel_publish(channel, message, **options) ⇒ void

This method returns an undefined value.

Parameters:

  • channel (Bunny::Channel)

    The channel to publish the message to.

  • message (String, Hash)

    The message to publish.

  • options (Hash)

    Additional options for the publish.

Raises:

  • (ArgumentError)


39
40
41
42
43
44
45
46
47
48
# File 'lib/lepus/publisher.rb', line 39

def channel_publish(channel, message, **options)
  raise ArgumentError, "channel is required" unless channel
  return unless Producers.exchange_enabled?(exchange_name)

  payload, opts = prepare_message(message, **options)
  exchange = channel.exchange(exchange_name, exchange_options)
  Lepus.instrument(:publish, exchange: exchange_name, routing_key: opts[:routing_key]) do
    exchange.publish(payload, opts)
  end
end

#publish(message, **options) ⇒ Object



25
26
27
28
29
30
31
32
33
# File 'lib/lepus/publisher.rb', line 25

def publish(message, **options)
  return unless Producers.exchange_enabled?(exchange_name)

  Lepus.config.producer_config.with_connection do |connection|
    connection.with_channel do |channel|
      channel_publish(channel, message, **options)
    end
  end
end