Class: Fluent::Plugin::RabbitMQOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_rabbitmq.rb

Instance Method Summary collapse

Constructor Details

#initializeRabbitMQOutput

Returns a new instance of RabbitMQOutput.



68
69
70
71
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 68

def initialize
  super
  require "bunny"
end

Instance Method Details

#configure(conf) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 73

def configure(conf)
  compat_parameters_convert(conf, :inject, :formatter, default_chunk_key: "time")

  super

  bunny_options = {}
  bunny_options[:host] = @host if @host
  bunny_options[:hosts] = @hosts if @hosts
  bunny_options[:port] = @port if @port
  bunny_options[:user] = @user if @user
  bunny_options[:pass] = @pass if @pass
  bunny_options[:vhost] = @vhost if @vhost
  bunny_options[:connection_timeout] = @connection_timeout if @connection_timeout
  bunny_options[:continuation_timeout] = @continuation_timeout if @continuation_timeout
  bunny_options[:automatically_recover] = @automatically_recover if @automatically_recover
  bunny_options[:network_recovery_interval] = @network_recovery_interval if @network_recovery_interval
  bunny_options[:recovery_attempts] = @recovery_attempts
  bunny_options[:auth_mechanism] = @auth_mechanism if @auth_mechanism
  bunny_options[:heartbeat] = @heartbeat if @heartbeat
  bunny_options[:frame_max] = @frame_max if @frame_max

  bunny_options[:tls] = @tls
  bunny_options[:tls_cert] = @tls_cert if @tls_cert
  bunny_options[:tls_key] = @tls_key if @tls_key
  bunny_options[:tls_ca_certificates] = @tls_ca_certificates if @tls_ca_certificates
  bunny_options[:verify_peer] = @verify_peer

  @bunny = Bunny.new(bunny_options)

  @publish_options = {}
  @publish_options[:content_type] = @content_type if @content_type
  @publish_options[:content_encoding] = @content_encoding if @content_encoding
  @publish_options[:persistent] = @persistent if @persistent
  @publish_options[:mandatory] = @mandatory if @mandatory
  @publish_options[:expiration] = @expiration if @expiration
  @publish_options[:message_type] = @message_type if @message_type
  @publish_options[:priority] = @priority if @priority
  @publish_options[:app_id] = @app_id if @app_id

  @formatter = formatter_create(default_type: @type)
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


115
116
117
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 115

def multi_workers_ready?
  true
end

#prefer_buffered_processingObject



119
120
121
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 119

def prefer_buffered_processing
  false
end

#process(tag, es) ⇒ Object



152
153
154
155
156
157
158
159
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 152

def process(tag, es)
  es.each do |time, record|
    set_publish_options(tag, time, record)
    record = inject_values_to_record(tag, time, record)
    buf = @formatter.format(tag, time, record)
    @bunny_exchange.publish(buf, @publish_options)
  end
end

#set_publish_options(tag, time, record) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 139

def set_publish_options(tag, time, record)
  @publish_options[:timestamp] = time.to_i if @timestamp

  if @exchange_type != "fanout"
    @publish_options[:routing_key] = @routing_key ? @routing_key : tag
  end

  if @id_key
    id = record[@id_key]
    @publish_options[:message_id] = id if id
  end
end

#shutdownObject



134
135
136
137
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 134

def shutdown
  @bunny.close
  super
end

#startObject



123
124
125
126
127
128
129
130
131
132
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 123

def start
  super
  @bunny.start
  @channel = @bunny.create_channel
  exchange_options = {
    durable: @exchange_durable,
    auto_delete: @exchange_auto_delete
  }
  @bunny_exchange = Bunny::Exchange.new(@channel, @exchange_type, @exchange, exchange_options)
end

#write(chunk) ⇒ Object



161
162
163
164
165
166
167
168
169
170
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 161

def write(chunk)
  tag = chunk..tag

  chunk.each do |time, record|
    set_publish_options(tag, time, record)
    record = inject_values_to_record(tag, time, record)
    buf = @formatter.format(tag, time, record)
    @bunny_exchange.publish(buf, @publish_options)
  end
end