Class: Fluent::Plugin::MysqlReplicatorElasticsearchOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::MysqlReplicatorElasticsearchOutput
- Defined in:
- lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"- DEFAULT_TAG_FORMAT =
/(?<index_name>[^\.]+)\.(?<type_name>[^\.]+)\.(?<event>[^\.]+)\.(?<primary_key>[^\.]+)$/
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary? ⇒ Boolean
-
#initialize ⇒ MysqlReplicatorElasticsearchOutput
constructor
A new instance of MysqlReplicatorElasticsearchOutput.
- #multi_workers_ready? ⇒ Boolean
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ MysqlReplicatorElasticsearchOutput
Returns a new instance of MysqlReplicatorElasticsearchOutput.
40 41 42 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 40 def initialize super end |
Instance Method Details
#configure(conf) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 44 def configure(conf) super if @tag_format.nil? || @tag_format == DEFAULT_TAG_FORMAT @tag_format = DEFAULT_TAG_FORMAT else @tag_format = Regexp.new(conf['tag_format']) end if @template_name.nil? != @template_file.nil? raise Fluent::ConfigError, "mysql_replicator_elasticsearch: 'template_name' and 'template_file' must be set together" end if @template_file && !File.exist?(@template_file) raise Fluent::ConfigError, "mysql_replicator_elasticsearch: template_file not found: #{@template_file}" end end |
#format(tag, time, record) ⇒ Object
69 70 71 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 69 def format(tag, time, record) [tag, time, record].to_msgpack end |
#formatted_to_msgpack_binary? ⇒ Boolean
81 82 83 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 81 def formatted_to_msgpack_binary? true end |
#multi_workers_ready? ⇒ Boolean
77 78 79 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 77 def multi_workers_ready? true end |
#shutdown ⇒ Object
73 74 75 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 73 def shutdown super end |
#start ⇒ Object
61 62 63 64 65 66 67 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 61 def start super # nil means "not yet detected"; resolved on the first write. @suppress_type = nil @es_version = nil @template_installed = false end |
#write(chunk) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 85 def write(chunk) detect_type_suppression if @suppress_type.nil? install_template_once if @template_name = [] chunk.msgpack_each do |tag, time, record| tag_parts = tag.match(@tag_format) target_index = resolve_index_name(tag_parts['index_name'], time) target_type = tag_parts['type_name'] id_keys = tag_parts['primary_key'].to_s.split(',') if tag_parts['event'] == 'delete' action = {"_index" => target_index, "_id" => join_id(record, id_keys)} action['_type'] = target_type unless @suppress_type = { "delete" => action } << Yajl::Encoder.encode() else action = {"_index" => target_index} action['_type'] = target_type unless @suppress_type if !id_keys.empty? && id_keys.all? {|k| !record[k].nil? } action['_id'] = join_id(record, id_keys) end = { "index" => action } << Yajl::Encoder.encode() << Yajl::Encoder.encode(record) end end << "" request = Net::HTTP::Post.new('/_bulk', {'content-type' => 'application/json; charset=utf-8'}) if @username && @password request.basic_auth(@username, @password) end request.body = .join("\n") new_http.request(request).value end |