Class: Fluent::Plugin::MysqlReplicatorElasticsearchOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"
DEFAULT_TAG_FORMAT =
/(?<index_name>[^\.]+)\.(?<type_name>[^\.]+)\.(?<event>[^\.]+)\.(?<primary_key>[^\.]+)$/

Instance Method Summary collapse

Constructor Details

#initializeMysqlReplicatorElasticsearchOutput

Returns a new instance of MysqlReplicatorElasticsearchOutput.



26
27
28
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 26

def initialize
  super
end

Instance Method Details

#configure(conf) ⇒ Object



30
31
32
33
34
35
36
37
38
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 30

def configure(conf)
  super

  if @tag_format.nil? || @tag_format == DEFAULT_TAG_FORMAT
    @tag_format = DEFAULT_TAG_FORMAT
  else
    @tag_format = Regexp.new(conf['tag_format'])
  end
end

#format(tag, time, record) ⇒ Object



46
47
48
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 46

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#formatted_to_msgpack_binary?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 58

def formatted_to_msgpack_binary?
  true
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 54

def multi_workers_ready?
  true
end

#shutdownObject



50
51
52
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 50

def shutdown
  super
end

#startObject



40
41
42
43
44
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 40

def start
  super
  # nil means "not yet detected"; resolved on the first write.
  @suppress_type = nil
end

#write(chunk) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 62

def write(chunk)
  detect_type_suppression if @suppress_type.nil?

  bulk_message = []

  chunk.msgpack_each do |tag, time, record|
    tag_parts = tag.match(@tag_format)
    target_index = tag_parts['index_name']
    target_type = tag_parts['type_name']
    id_key = tag_parts['primary_key']

    if tag_parts['event'] == 'delete'
      action = {"_index" => target_index, "_id" => record[id_key]}
      action['_type'] = target_type unless @suppress_type
      meta = { "delete" => action }
      bulk_message << Yajl::Encoder.encode(meta)
    else
      action = {"_index" => target_index}
      action['_type'] = target_type unless @suppress_type
      if id_key && record[id_key]
        action['_id'] = record[id_key]
      end
      meta = { "index" => action }
      bulk_message << Yajl::Encoder.encode(meta)
      bulk_message << Yajl::Encoder.encode(record)
    end
  end
  bulk_message << ""

  request = Net::HTTP::Post.new('/_bulk', {'content-type' => 'application/json; charset=utf-8'})
  if @username && @password
    request.basic_auth(@username, @password)
  end

  request.body = bulk_message.join("\n")
  new_http.request(request).value
end