Class: EventHub::ActorPublisher
- Inherits:
-
Object
- Object
- EventHub::ActorPublisher
- Includes:
- Celluloid, Helper
- Defined in:
- lib/eventhub/actor_publisher.rb
Overview
Publisher class
Instance Method Summary collapse
- #cleanup ⇒ Object
-
#initialize ⇒ ActorPublisher
constructor
A new instance of ActorPublisher.
- #publish(args = {}) ⇒ Object
Methods included from Helper
#bunny_connection_options, #create_bunny_connection, #get_name_from_class, #now_stamp, #stringify_keys
Constructor Details
#initialize ⇒ ActorPublisher
Returns a new instance of ActorPublisher.
10 11 12 13 14 |
# File 'lib/eventhub/actor_publisher.rb', line 10 def initialize EventHub.logger.info("Publisher is starting...") @connection = nil @channel = nil end |
Instance Method Details
#cleanup ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/eventhub/actor_publisher.rb', line 43 def cleanup EventHub.logger.info("Publisher is cleaning up...") begin @channel&.close rescue => ex EventHub.logger.warn("Publisher cleanup channel: ignoring #{ex.class}: #{ex.}") end begin @connection&.close rescue => ex EventHub.logger.warn("Publisher cleanup connection: ignoring #{ex.class}: #{ex.}") end end |
#publish(args = {}) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/eventhub/actor_publisher.rb', line 16 def publish(args = {}) ensure_channel = args[:message] return if .nil? exchange_name = args[:exchange_name] || EH_X_INBOUND exchange = @channel.direct(exchange_name, durable: true) = {persistent: true} correlation_id = args[:correlation_id] || CorrelationId.current [:correlation_id] = correlation_id if correlation_id exchange.publish(, ) nil rescue Bunny::NetworkFailure, Bunny::ChannelAlreadyClosed => e # broker-side close - drop the channel so next publish reopens it EventHub.logger.warn("Publisher channel dropped: #{e.class}: #{e.}") begin @channel&.close rescue nil end @channel = nil raise end |