Class: Fluent::Plugin::FormatDruidAuditLog2Filter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_format_druid_audit_log_2.rb

Constant Summary collapse

NAME =
'format_druid_audit_log_2'
DEFAULT_QUERY_KEY =
'query'
DEFAULT_QUERY_RESULT_KEY =
'query_result'

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


21
22
23
24
25
26
27
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 21

def configure(conf)
  super

  return unless query_key.nil? and query_result.nil?

  raise Fluent::ConfigError, 'query_key should be specified'
end

#filter(_tag, _time, record) ⇒ Object



33
34
35
36
37
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 33

def filter(_tag, _time, record)
  new_record = format_record(record.dup)
  fix_record(new_record)
  new_record
end

#fix_record(record) ⇒ Object



59
60
61
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 59

def fix_record(record)
  fix_record_query_granularity(record)
end

#fix_record_query_granularity(record) ⇒ Object



63
64
65
66
67
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 63

def fix_record_query_granularity(record)
  update_all_key_value(record, 'granularity') do |value|
    value.to_s unless value.nil?
  end
end

#format_record(record) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 39

def format_record(record)
  [query_key, query_result_key].each do |key|
    if record[key].is_a? String
      record[key] = record[key].size > 0 ? JSON.parse(record[key]) : {}
    end
  end

  query_type = guess_query_type(record)
  record['query_type'] = query_type

  record
end

#guess_query_type(record) ⇒ Object



52
53
54
55
56
57
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 52

def guess_query_type(record)
  record.dig(query_key,
             'queryType') || (record.dig('query_result',
                                         'sqlQuery/time') && 'sql') || (record.dig(query_key,
                                                                                   'query') && 'sql') || 'unknown'
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


29
30
31
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 29

def multi_workers_ready?
  true
end

#update_all_key_value(record, key, &block) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 69

def update_all_key_value(record, key, &block)
  record.each do |rkey, rvalue|
    if rkey.to_s == key
      record[rkey] = yield(rvalue) if block_given?
      next
    end

    if rvalue.is_a?(Hash)
      update_all_key_value(rvalue, key, &block)
    end
  end
end