Class: Fluent::Plugin::SplunkIngestApiOutput

Inherits:
SplunkOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_splunk_ingest_api.rb

Constant Summary

Constants inherited from SplunkOutput

Fluent::Plugin::SplunkOutput::KEY_FIELDS, Fluent::Plugin::SplunkOutput::TAG_PLACEHOLDER

Instance Method Summary collapse

Methods inherited from SplunkOutput

#initialize, #shutdown

Constructor Details

This class inherits a constructor from Fluent::Plugin::SplunkOutput

Instance Method Details

#configure(conf) ⇒ Object



40
41
42
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 40

def configure(conf)
  super
end

#construct_apiObject



48
49
50
51
52
53
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 48

def construct_api
  uri = "https://#{@ingest_api_host}/#{@ingest_api_tenant}#{@ingest_api_events_endpoint}"
  URI(uri)
rescue StandardError
  raise Fluent::ConfigError, "URI #{uri} is invalid"
end

#format(tag, time, record) ⇒ Object



55
56
57
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 55

def format(tag, time, record)
  format_event(tag, time, record)
end

#format_event(tag, time, record) ⇒ Object



59
60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 59

def format_event(tag, time, record)
  event = prepare_event_payload(tag, time, record)
  # Unsure how to drop a record. So append the empty string
  if event[:body].nil? || event[:body].strip.empty?
    ''
  else
    MultiJson.dump(event) + ','
  end
end

#new_connectionObject



93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 93

def new_connection
  Rack::OAuth2.debugging = true if @debug_http
  client = OpenIDConnect::Client.new(
    token_endpoint: @token_endpoint,
    identifier: @service_client_identifier,
    secret: @service_client_secret_key,
    redirect_uri: 'http://localhost:8080/', # Not used
    host: @ingest_auth_host,
    scheme: 'https'
  )

  client.access_token!(client_auth_method: 'other')
end

#prefer_buffer_processingObject



36
37
38
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 36

def prefer_buffer_processing
  true
end

#prepare_event_payload(tag, time, record) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 69

def prepare_event_payload(tag, time, record)
  payload = super(tag, time, record)
  payload[:attributes] = payload.delete(:fields) || {}
  payload[:attributes][:index] = payload.delete(:index) if payload[:index]
  payload[:body] = payload.delete(:event)
  payload.delete(:time)
  payload[:timestamp] = (time.to_f * 1000).to_i
  payload[:nanos] = time.nsec / 100_000

  payload
end

#process_response(response, request_body) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 81

def process_response(response, request_body)
  super
  if response.code.to_s == '401'
    @conn = new_connection
    raise 'Auth Error recived. New token has been fetched.'
  elsif response.code.to_s == '429'
    raise "Throttle error from server. #{response.body}"
  elsif /INVALID_DATA/.match?(response.body)
    log.error "#{self.class}: POST Body #{request_body}"
  end
end

#write(chunk) ⇒ Object



44
45
46
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 44

def write(chunk)
  super
end

#write_to_splunk(chunk) ⇒ Object



107
108
109
110
111
112
113
114
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 107

def write_to_splunk(chunk)
  log.trace "#{self.class}: In write() with #{chunk.size} records and #{chunk.bytesize} bytes "
  # ingest API is an array of json objects
  body = "[#{chunk.read.chomp(',')}]"
  @conn ||= new_connection
  response = @conn.post("https://#{@ingest_api_host}/#{@ingest_api_tenant}#{@ingest_api_events_endpoint}", body: body)
  process_response(response, body)
end