Class: Lepus::Publisher
- Inherits:
-
Object
- Object
- Lepus::Publisher
- Defined in:
- lib/lepus/publisher.rb
Constant Summary collapse
- DEFAULT_EXCHANGE_OPTIONS =
{ type: :topic, durable: true, auto_delete: false }.freeze
- DEFAULT_PUBLISH_OPTIONS =
{ persistent: true }.freeze
Instance Method Summary collapse
- #channel_publish(channel, message, **options) ⇒ void
- #initialize(exchange_name, **options) ⇒ void constructor
- #publish(message, **options) ⇒ Object
Constructor Details
#initialize(exchange_name, **options) ⇒ void
20 21 22 23 |
# File 'lib/lepus/publisher.rb', line 20 def initialize(exchange_name, **) @exchange_name = exchange_name @exchange_options = DEFAULT_EXCHANGE_OPTIONS.merge() end |
Instance Method Details
#channel_publish(channel, message, **options) ⇒ void
This method returns an undefined value.
39 40 41 42 43 44 45 46 47 48 |
# File 'lib/lepus/publisher.rb', line 39 def channel_publish(channel, , **) raise ArgumentError, "channel is required" unless channel return unless Producers.exchange_enabled?(exchange_name) payload, opts = (, **) exchange = channel.exchange(exchange_name, ) Lepus.instrument(:publish, exchange: exchange_name, routing_key: opts[:routing_key]) do exchange.publish(payload, opts) end end |
#publish(message, **options) ⇒ Object
25 26 27 28 29 30 31 32 33 |
# File 'lib/lepus/publisher.rb', line 25 def publish(, **) return unless Producers.exchange_enabled?(exchange_name) Lepus.config.producer_config.with_connection do |connection| connection.with_channel do |channel| channel_publish(channel, , **) end end end |