Class: Fluent::Plugin::RdKafkaGroupInput
Defined Under Namespace
Classes: Batch, ForShutdown
Constant Summary
collapse
- BufferError =
Fluent::Plugin::Buffer::BufferOverflowError
KafkaPluginUtil::SSLSettings::DummyFormatter
Instance Method Summary
collapse
included
included, #pickup_ssl_endpoint, #read_ssl_file
Constructor Details
Returns a new instance of RdKafkaGroupInput.
65
66
67
68
69
70
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 65
def initialize
super
@time_parser = nil
@retry_count = 1
end
|
Instance Method Details
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 98
def configure(conf)
compat_parameters_convert(conf, :parser)
super
log.warn "The in_rdkafka_group consumer was not yet tested under heavy production load. Use it at your own risk!"
log.info "Will watch for topics #{@topics} at brokers " \
"#{@kafka_configs["bootstrap.servers"]} and '#{@kafka_configs["group.id"]}' group"
topics = @topics.strip
if regex_pattern?(topics)
@topics = _config_regex_pattern(topics)
else
@topics = _config_to_array(topics)
end
parser_conf = conf.elements('parse').first
unless parser_conf
raise Fluent::ConfigError, "<parse> section or format parameter is required."
end
unless parser_conf["@type"]
raise Fluent::ConfigError, "parse/@type is required."
end
@parser_proc = setup_parser(parser_conf)
@time_source = :record if @use_record_time
if @time_source == :record and @time_format
@time_parser = Fluent::TimeParser.new(@time_format)
end
end
|
#each_batch {|batch| ... } ⇒ Object
Executes the passed codeblock on a batch of messages. It is guaranteed that every message in a given batch belongs to the same topic, because the tagging logic in :run expects that property. The number of maximum messages in a batch is capped by the :max_batch_size configuration value. It ensures that consuming from a single topic for a long time (e.g. with ‘auto.offset.reset` set to `earliest`) does not lead to memory exhaustion. Also, calling consumer.poll advances thes consumer offset, so in case the process crashes we might lose at most :max_batch_size messages.
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 222
def each_batch(&block)
batch = nil
message = nil
while @consumer
message = @consumer.poll(@max_wait_time_ms)
if message
if not batch
batch = Batch.new(message.topic)
elsif batch.topic != message.topic || batch.messages.size >= @max_batch_size
yield batch
batch = Batch.new(message.topic)
end
batch.messages << message
else
yield batch if batch
batch = nil
end
end
yield batch if batch
end
|
#emit_events(tag, es) ⇒ Object
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 301
def emit_events(tag, es)
retries = 0
begin
router.emit_stream(tag, es)
rescue BufferError
raise ForShutdown if @consumer.nil?
if @retry_emit_limit.nil?
sleep 1
retry
end
if retries < @retry_emit_limit
retries += 1
sleep 1
retry
else
raise RuntimeError, "Exceeds retry_emit_limit"
end
end
end
|
#multi_workers_ready? ⇒ Boolean
94
95
96
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 94
def multi_workers_ready?
true
end
|
#reconnect_consumer ⇒ Object
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 186
def reconnect_consumer
log.warn "Stopping Consumer"
consumer = @consumer
@consumer = nil
if consumer
consumer.close
end
log.warn "Could not connect to broker. retry_time:#{@retry_count}. Next retry will be in #{@retry_wait_seconds} seconds"
@retry_count = @retry_count + 1
sleep @retry_wait_seconds
@consumer = setup_consumer
log.warn "Re-starting consumer #{Time.now.to_s}"
@retry_count = 0
rescue =>e
log.error "unexpected error during re-starting consumer object access", :error => e.to_s
log.error_backtrace
if @retry_count <= @retry_limit or disable_retry_limit
reconnect_consumer
end
end
|
#run ⇒ Object
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 243
def run
while @consumer
begin
each_batch { |batch|
log.debug "A new batch for topic #{batch.topic} with #{batch.messages.size} messages"
es = Fluent::MultiEventStream.new
tag = batch.topic
tag = @add_prefix + "." + tag if @add_prefix
tag = tag + "." + @add_suffix if @add_suffix
batch.messages.each { |msg|
begin
record = @parser_proc.call(msg)
case @time_source
when :kafka
record_time = Fluent::EventTime.from_time(msg.timestamp)
when :now
record_time = Fluent::Engine.now
when :record
if @time_format
record_time = @time_parser.parse(record[@record_time_key].to_s)
else
record_time = record[@record_time_key]
end
else
log.fatal "BUG: invalid time_source: #{@time_source}"
end
if @kafka_message_key
record[@kafka_message_key] = msg.key
end
if @add_headers
msg..each_pair { |k, v|
record[k] = v
}
end
es.add(record_time, record)
rescue => e
log.warn "parser error in #{msg.topic}/#{msg.partition}", :error => e.to_s, :value => msg.payload, :offset => msg.offset
log.debug_backtrace
end
}
unless es.empty?
emit_events(tag, es)
end
}
rescue ForShutdown
rescue => e
log.error "unexpected error during consuming events from kafka. Re-fetch events.", :error => e.to_s
log.error_backtrace
reconnect_consumer
end
end
rescue => e
log.error "unexpected error during consumer object access", :error => e.to_s
log.error_backtrace
end
|
#setup_consumer ⇒ Object
180
181
182
183
184
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 180
def setup_consumer
consumer = Rdkafka::Config.new(@kafka_configs).consumer
consumer.subscribe(*@topics)
consumer
end
|
#setup_parser(parser_conf) ⇒ Object
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 131
def setup_parser(parser_conf)
format = parser_conf["@type"]
case format
when 'json'
begin
require 'oj'
Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS
Proc.new { |msg| Oj.load(msg.payload) }
rescue LoadError
require 'yajl'
Proc.new { |msg| Yajl::Parser.parse(msg.payload) }
end
when 'ltsv'
require 'ltsv'
Proc.new { |msg| LTSV.parse(msg.payload, {:symbolize_keys => false}).first }
when 'msgpack'
require 'msgpack'
Proc.new { |msg| MessagePack.unpack(msg.payload) }
when 'text'
Proc.new { |msg| {@message_key => msg.payload} }
else
@custom_parser = parser_create(usage: 'in-rdkafka-plugin', conf: parser_conf)
Proc.new { |msg|
@custom_parser.parse(msg.payload) {|_time, record|
record
}
}
end
end
|
#shutdown ⇒ Object
169
170
171
172
173
174
175
176
177
178
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 169
def shutdown
consumer = @consumer
@consumer = nil
consumer.close
super
end
|
#start ⇒ Object
161
162
163
164
165
166
167
|
# File 'lib/fluent/plugin/in_rdkafka_group.rb', line 161
def start
super
@consumer = setup_consumer
thread_create(:in_rdkafka_group, &method(:run))
end
|