Module: ActivePublisher
- Defined in:
- lib/active_publisher.rb,
lib/active_publisher/async.rb,
lib/active_publisher/logging.rb,
lib/active_publisher/message.rb,
lib/active_publisher/version.rb,
lib/active_publisher/connection.rb,
lib/active_publisher/configuration.rb,
lib/active_publisher/async/redis_adapter.rb,
lib/active_publisher/async/in_memory_adapter.rb,
lib/active_publisher/async/redis_adapter/consumer.rb,
lib/active_publisher/async/in_memory_adapter/channel.rb,
lib/active_publisher/async/in_memory_adapter/async_queue.rb,
lib/active_publisher/async/in_memory_adapter/consumer_thread.rb,
lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb
Defined Under Namespace
Modules: Async, Connection, Logging Classes: Configuration, ExchangeMismatchError, FailedPublisherConfirms, Message, UnknownMessageClassError
Constant Summary collapse
- VERSION =
"1.5.0.pre"
Class Method Summary collapse
- .configuration ⇒ Object
- .configure {|configuration| ... } ⇒ Object
-
.publish(route, payload, exchange_name, options = {}) ⇒ Object
Publish a message to RabbitMQ.
- .publish_all(exchange_name, messages) ⇒ Object
-
.publish_async(route, payload, exchange_name, options = {}) ⇒ Object
Publish a message asynchronously to RabbitMQ.
- .publishing_options(route, in_options = {}) ⇒ Object
- .with_exchange(exchange_name) ⇒ Object
Class Method Details
.configuration ⇒ Object
23 24 25 |
# File 'lib/active_publisher.rb', line 23 def self.configuration @configuration ||= ::ActivePublisher::Configuration.new end |
.configure {|configuration| ... } ⇒ Object
27 28 29 |
# File 'lib/active_publisher.rb', line 27 def self.configure yield(configuration) if block_given? end |
.publish(route, payload, exchange_name, options = {}) ⇒ Object
Publish a message to RabbitMQ
37 38 39 40 41 42 43 |
# File 'lib/active_publisher.rb', line 37 def self.publish(route, payload, exchange_name, = {}) with_exchange(exchange_name) do |exchange| ::ActiveSupport::Notifications.instrument "message_published.active_publisher", :route => route do exchange.publish(payload, (route, )) end end end |
.publish_all(exchange_name, messages) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/active_publisher.rb', line 45 def self.publish_all(exchange_name, ) with_exchange(exchange_name) do |exchange| loop do break if .empty? = .shift fail ActivePublisher::UnknownMessageClassError, "bulk publish messages must be ActivePublisher::Message" unless .is_a?(ActivePublisher::Message) fail ActivePublisher::ExchangeMismatchError, "bulk publish messages must match publish_all exchange_name" if .exchange_name != exchange_name begin ::ActiveSupport::Notifications.instrument "message_published.active_publisher", :route => .route do exchange.publish(.payload, (.route, . || {})) end rescue << raise end end end end |
.publish_async(route, payload, exchange_name, options = {}) ⇒ Object
Publish a message asynchronously to RabbitMQ.
Asynchronous is designed to do two things:
-
Introduce the idea of a durable retry should the RabbitMQ connection disconnect.
-
Provide a higher-level pattern for fire-and-forget publishing.
12 13 14 |
# File 'lib/active_publisher/async.rb', line 12 def self.publish_async(route, payload, exchange_name, = {}) ::ActivePublisher::Async.publisher_adapter.publish(route, payload, exchange_name, ) end |
.publishing_options(route, in_options = {}) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/active_publisher.rb', line 66 def self.(route, = {}) = { :mandatory => false, :persistent => false, :routing_key => route, }.merge() if ::RUBY_PLATFORM == "java" = {} [:mandatory] = .delete(:mandatory) [:routing_key] = .delete(:routing_key) [:properties] = else end end |
.with_exchange(exchange_name) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/active_publisher.rb', line 84 def self.with_exchange(exchange_name) connection = ::ActivePublisher::Connection.connection channel = connection.create_channel begin channel.confirm_select if configuration.publisher_confirms exchange = channel.topic(exchange_name) yield(exchange) channel.wait_for_confirms if configuration.publisher_confirms ensure channel.close rescue nil end end |