Class: Fluent::Plugin::FormatDruidAuditLog2Filter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_format_druid_audit_log_2.rb

Constant Summary collapse

NAME =
'format_druid_audit_log_2'
DEFAULT_QUERY_KEY =
'query'
DEFAULT_QUERY_RESULT_KEY =
'query_result'

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


21
22
23
24
25
26
27
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 21

def configure(conf)
  super

  return unless query_key.nil? && query_result.nil?

  raise Fluent::ConfigError, 'query_key should be specified'
end

#filter(_tag, _time, record) ⇒ Object



33
34
35
36
37
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 33

def filter(_tag, _time, record)
  new_record = format_record(record.dup)
  fix_record(new_record)
  new_record
end

#fix_record(record) ⇒ Object



59
60
61
62
63
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 59

def fix_record(record)
  fix_record_query_granularity(record)
  fix_record_query_match_value(record)
  fix_record_query_datasource_rows(record)
end

#fix_record_query_datasource_rows(record) ⇒ Object



77
78
79
80
81
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 77

def fix_record_query_datasource_rows(record)
  update_all_key_value(record.dig('query', 'dataSource'), 'rows') do |value|
    value&.to_s
  end
end

#fix_record_query_granularity(record) ⇒ Object



65
66
67
68
69
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 65

def fix_record_query_granularity(record)
  update_all_key_value(record['query'], 'granularity') do |value|
    value&.to_s
  end
end

#fix_record_query_match_value(record) ⇒ Object



71
72
73
74
75
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 71

def fix_record_query_match_value(record)
  update_all_key_value(record.dig('query', 'filter'), 'matchValue') do |value|
    value&.to_s
  end
end

#format_record(record) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 39

def format_record(record)
  [query_key, query_result_key].each do |key|
    if record[key].is_a? String
      record[key] = record[key].size.positive? ? JSON.parse(record[key]) : {}
    end
  end

  query_type = guess_query_type(record)
  record['query_type'] = query_type

  record
end

#guess_query_type(record) ⇒ Object



52
53
54
55
56
57
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 52

def guess_query_type(record)
  record.dig(query_key,
             'queryType') || (record.dig('query_result',
                                         'sqlQuery/time') && 'sql') || (record.dig(query_key,
                                                                                   'query') && 'sql') || 'unknown'
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


29
30
31
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 29

def multi_workers_ready?
  true
end

#update_all_key_value(record, key, &block) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 83

def update_all_key_value(record, key, &block)
  case record
  when Hash
    record.each do |rkey, rvalue|
      if rkey.to_s == key
        record[rkey] = yield(rvalue) if block_given?
        next
      end

      update_all_key_value(rvalue, key, &block)
    end
  when Array
    record.each do |item|
      update_all_key_value(item, key, &block)
    end
  end
end