Class: Fluent::Plugin::MysqlReplicatorElasticsearchOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::MysqlReplicatorElasticsearchOutput
- Defined in:
- lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"- DEFAULT_TAG_FORMAT =
/(?<index_name>[^\.]+)\.(?<type_name>[^\.]+)\.(?<event>[^\.]+)\.(?<primary_key>[^\.]+)$/
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary? ⇒ Boolean
-
#initialize ⇒ MysqlReplicatorElasticsearchOutput
constructor
A new instance of MysqlReplicatorElasticsearchOutput.
- #multi_workers_ready? ⇒ Boolean
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ MysqlReplicatorElasticsearchOutput
Returns a new instance of MysqlReplicatorElasticsearchOutput.
26 27 28 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 26 def initialize super end |
Instance Method Details
#configure(conf) ⇒ Object
30 31 32 33 34 35 36 37 38 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 30 def configure(conf) super if @tag_format.nil? || @tag_format == DEFAULT_TAG_FORMAT @tag_format = DEFAULT_TAG_FORMAT else @tag_format = Regexp.new(conf['tag_format']) end end |
#format(tag, time, record) ⇒ Object
46 47 48 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 46 def format(tag, time, record) [tag, time, record].to_msgpack end |
#formatted_to_msgpack_binary? ⇒ Boolean
58 59 60 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 58 def formatted_to_msgpack_binary? true end |
#multi_workers_ready? ⇒ Boolean
54 55 56 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 54 def multi_workers_ready? true end |
#shutdown ⇒ Object
50 51 52 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 50 def shutdown super end |
#start ⇒ Object
40 41 42 43 44 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 40 def start super # nil means "not yet detected"; resolved on the first write. @suppress_type = nil end |
#write(chunk) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 62 def write(chunk) detect_type_suppression if @suppress_type.nil? = [] chunk.msgpack_each do |tag, time, record| tag_parts = tag.match(@tag_format) target_index = tag_parts['index_name'] target_type = tag_parts['type_name'] id_key = tag_parts['primary_key'] if tag_parts['event'] == 'delete' action = {"_index" => target_index, "_id" => record[id_key]} action['_type'] = target_type unless @suppress_type = { "delete" => action } << Yajl::Encoder.encode() else action = {"_index" => target_index} action['_type'] = target_type unless @suppress_type if id_key && record[id_key] action['_id'] = record[id_key] end = { "index" => action } << Yajl::Encoder.encode() << Yajl::Encoder.encode(record) end end << "" request = Net::HTTP::Post.new('/_bulk', {'content-type' => 'application/json; charset=utf-8'}) if @username && @password request.basic_auth(@username, @password) end request.body = .join("\n") new_http.request(request).value end |