Class: Fluent::Plugin::IP2LocationFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_ip2location.rb

Constant Summary collapse

KNOWN_FIELDS =
%w[
  country_short
  country_long
  region
  city
  isp
  latitude
  longitude
  domain
  zipcode
  timezone
  netspeed
  iddcode
  areacode
  weatherstationcode
  weatherstationname
  mcc
  mnc
  mobilebrand
  elevation
  usagetype
  addresstype
  category
  district
  asn
  as
  as_domain
  as_usagetype
  as_cidr
].freeze
VALID_ERROR_MODES =
%w[ignore warn raise].freeze

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/fluent/plugin/filter_ip2location.rb', line 60

def configure(conf)
  super

  raise Fluent::ConfigError, "database does not exist: #{@database}" unless File.file?(@database)
  raise Fluent::ConfigError, "cache_size must be zero or greater" if @cache_size.negative?
  raise Fluent::ConfigError, "on_error must be one of: #{VALID_ERROR_MODES.join(', ')}" unless VALID_ERROR_MODES.include?(@on_error)
  raise Fluent::ConfigError, "prefix cannot be empty when merge_record is true" if @merge_record && @prefix.empty?

  @selected_fields = parse_fields(@fields)
  @ip_accessor = record_accessor_create(@ip_field)
  @output_accessor = record_accessor_create(@output_field) unless @merge_record
  @cache = {}
  @cache_order = []
  @cache_mutex = Mutex.new
end

#filter(_tag, _time, record) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/filter_ip2location.rb', line 89

def filter(_tag, _time, record)
  ip = normalize_ip(@ip_accessor.call(record))
  return record if ip.nil?

  case ip_status(ip)
  when :invalid
    return handle_invalid_ip(record, ip)
  when :private
    return record
  end

  enriched = lookup(ip)
  return record if enriched.empty?

  if @merge_record
    enriched.each { |key, value| record["#{@prefix}#{key}"] = value }
  else
    @output_accessor.set(record, enriched)
  end

  record
rescue StandardError => e
  handle_lookup_error(e, ip)
  record
end

#shutdownObject



83
84
85
86
87
# File 'lib/fluent/plugin/filter_ip2location.rb', line 83

def shutdown
  @ip2location.close if @ip2location&.respond_to?(:close)
ensure
  super
end

#startObject



76
77
78
79
80
81
# File 'lib/fluent/plugin/filter_ip2location.rb', line 76

def start
  super
  @ip2location = Ip2location.new.open(@database)
rescue StandardError => e
  raise Fluent::ConfigError, "failed to open IP2Location database #{@database}: #{e.message}"
end