Class: Fluent::Plugin::FormatDruidAuditLog2Filter
- Inherits:
-
Filter
- Object
- Filter
- Fluent::Plugin::FormatDruidAuditLog2Filter
- Defined in:
- lib/fluent/plugin/filter_format_druid_audit_log_2.rb
Constant Summary collapse
- NAME =
'format_druid_audit_log_2'- DEFAULT_QUERY_KEY =
'query'- DEFAULT_QUERY_RESULT_KEY =
'query_result'
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #filter(_tag, _time, record) ⇒ Object
- #fix_record(record) ⇒ Object
- #fix_record_query_datasource_rows(record) ⇒ Object
- #fix_record_query_granularity(record) ⇒ Object
- #fix_record_query_match_value(record) ⇒ Object
- #format_record(record) ⇒ Object
- #guess_query_type(record) ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #update_all_key_value(record, key, &block) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
21 22 23 24 25 26 27 |
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 21 def configure(conf) super return unless query_key.nil? && query_result.nil? raise Fluent::ConfigError, 'query_key should be specified' end |
#filter(_tag, _time, record) ⇒ Object
33 34 35 36 37 |
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 33 def filter(_tag, _time, record) new_record = format_record(record.dup) fix_record(new_record) new_record end |
#fix_record(record) ⇒ Object
59 60 61 62 63 |
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 59 def fix_record(record) fix_record_query_granularity(record) fix_record_query_match_value(record) fix_record_query_datasource_rows(record) end |
#fix_record_query_datasource_rows(record) ⇒ Object
77 78 79 80 81 |
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 77 def fix_record_query_datasource_rows(record) update_all_key_value(record.dig('query', 'dataSource'), 'rows') do |value| value&.to_s end end |
#fix_record_query_granularity(record) ⇒ Object
65 66 67 68 69 |
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 65 def fix_record_query_granularity(record) update_all_key_value(record['query'], 'granularity') do |value| value&.to_s end end |
#fix_record_query_match_value(record) ⇒ Object
71 72 73 74 75 |
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 71 def fix_record_query_match_value(record) update_all_key_value(record.dig('query', 'filter'), 'matchValue') do |value| value&.to_s end end |
#format_record(record) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 39 def format_record(record) [query_key, query_result_key].each do |key| if record[key].is_a? String record[key] = record[key].size.positive? ? JSON.parse(record[key]) : {} end end query_type = guess_query_type(record) record['query_type'] = query_type record end |
#guess_query_type(record) ⇒ Object
52 53 54 55 56 57 |
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 52 def guess_query_type(record) record.dig(query_key, 'queryType') || (record.dig('query_result', 'sqlQuery/time') && 'sql') || (record.dig(query_key, 'query') && 'sql') || 'unknown' end |
#multi_workers_ready? ⇒ Boolean
29 30 31 |
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 29 def multi_workers_ready? true end |
#update_all_key_value(record, key, &block) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/fluent/plugin/filter_format_druid_audit_log_2.rb', line 83 def update_all_key_value(record, key, &block) case record when Hash record.each do |rkey, rvalue| if rkey.to_s == key record[rkey] = yield(rvalue) if block_given? next end update_all_key_value(rvalue, key, &block) end when Array record.each do |item| update_all_key_value(item, key, &block) end end end |