Module: ActivePublisher

Defined in:
lib/active_publisher.rb,
lib/active_publisher/async.rb,
lib/active_publisher/logging.rb,
lib/active_publisher/message.rb,
lib/active_publisher/version.rb,
lib/active_publisher/connection.rb,
lib/active_publisher/configuration.rb,
lib/active_publisher/async/redis_adapter.rb,
lib/active_publisher/async/in_memory_adapter.rb,
lib/active_publisher/async/redis_adapter/consumer.rb,
lib/active_publisher/async/in_memory_adapter/channel.rb,
lib/active_publisher/async/in_memory_adapter/async_queue.rb,
lib/active_publisher/async/in_memory_adapter/consumer_thread.rb,
lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb

Defined Under Namespace

Modules: Async, Connection, Logging Classes: Configuration, ExchangeMismatchError, FailedPublisherConfirms, Message, UnknownMessageClassError

Constant Summary collapse

VERSION =
"1.3.2"

Class Method Summary collapse

Class Method Details

.configurationObject



23
24
25
# File 'lib/active_publisher.rb', line 23

def self.configuration
  @configuration ||= ::ActivePublisher::Configuration.new
end

.configure {|configuration| ... } ⇒ Object

Yields:



27
28
29
# File 'lib/active_publisher.rb', line 27

def self.configure
  yield(configuration) if block_given?
end

.publish(route, payload, exchange_name, options = {}) ⇒ Object

Publish a message to RabbitMQ

Parameters:

  • route (String)

    The routing key to use for this message.

  • payload (String)

    The message you are sending. Should already be encoded as a string.

  • exchange (String)

    The exchange you want to publish to.

  • options (Hash) (defaults to: {})

    hash to set message parameters (e.g. headers)



37
38
39
40
41
42
43
# File 'lib/active_publisher.rb', line 37

def self.publish(route, payload, exchange_name, options = {})
  with_exchange(exchange_name) do |exchange|
    ::ActiveSupport::Notifications.instrument "message_published.active_publisher", :route => route do
      exchange.publish(payload, publishing_options(route, options))
    end
  end
end

.publish_all(exchange_name, messages) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/active_publisher.rb', line 45

def self.publish_all(exchange_name, messages)
  with_exchange(exchange_name) do |exchange|
    loop do
      break if messages.empty?
      message = messages.shift

      fail ActivePublisher::UnknownMessageClassError, "bulk publish messages must be ActivePublisher::Message" unless message.is_a?(ActivePublisher::Message)
      fail ActivePublisher::ExchangeMismatchError, "bulk publish messages must match publish_all exchange_name" if message.exchange_name != exchange_name

      begin
        ::ActiveSupport::Notifications.instrument "message_published.active_publisher", :route => message.route do
          exchange.publish(message.payload, publishing_options(message.route, message.options || {}))
        end
      rescue
        messages << message
        raise
      end
    end
  end
end

.publish_async(route, payload, exchange_name, options = {}) ⇒ Object

Publish a message asynchronously to RabbitMQ.

Asynchronous is designed to do two things:

  1. Introduce the idea of a durable retry should the RabbitMQ connection disconnect.

  2. Provide a higher-level pattern for fire-and-forget publishing.

Parameters:

  • route (String)

    The routing key to use for this message.

  • payload (String)

    The message you are sending. Should already be encoded as a string.

  • exchange (String)

    The exchange you want to publish to.

  • options (Hash) (defaults to: {})

    hash to set message parameters (e.g. headers).



12
13
14
# File 'lib/active_publisher/async.rb', line 12

def self.publish_async(route, payload, exchange_name, options = {})
  ::ActivePublisher::Async.publisher_adapter.publish(route, payload, exchange_name, options)
end

.publishing_options(route, in_options = {}) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/active_publisher.rb', line 66

def self.publishing_options(route, in_options = {})
  options = {
    :mandatory => false,
    :persistent => false,
    :routing_key => route,
  }.merge(in_options)

  if ::RUBY_PLATFORM == "java"
    java_options = {}
    java_options[:mandatory]   = options.delete(:mandatory)
    java_options[:routing_key] = options.delete(:routing_key)
    java_options[:properties]  = options
    java_options
  else
    options
  end
end

.with_exchange(exchange_name) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/active_publisher.rb', line 84

def self.with_exchange(exchange_name)
  connection = ::ActivePublisher::Connection.connection
  channel = connection.create_channel
  begin
    channel.confirm_select if configuration.publisher_confirms
    exchange = channel.topic(exchange_name)
    yield(exchange)
    channel.wait_for_confirms if configuration.publisher_confirms
  ensure
    channel.close rescue nil
  end
end