Class: Fluent::Plugin::SplunkIngestApiOutput
Constant Summary
Constants inherited
from SplunkOutput
Fluent::Plugin::SplunkOutput::KEY_FIELDS, Fluent::Plugin::SplunkOutput::TAG_PLACEHOLDER
Instance Method Summary
collapse
#initialize, #shutdown
Instance Method Details
40
41
42
|
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 40
def configure(conf)
super
end
|
#construct_api ⇒ Object
48
49
50
51
52
53
|
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 48
def construct_api
uri = "https://#{@ingest_api_host}/#{@ingest_api_tenant}#{@ingest_api_events_endpoint}"
URI(uri)
rescue StandardError
raise Fluent::ConfigError, "URI #{uri} is invalid"
end
|
55
56
57
|
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 55
def format(tag, time, record)
format_event(tag, time, record)
end
|
59
60
61
62
63
64
65
66
67
|
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 59
def format_event(tag, time, record)
event = prepare_event_payload(tag, time, record)
if event[:body].nil? || event[:body].strip.empty?
''
else
MultiJson.dump(event) + ','
end
end
|
#new_connection ⇒ Object
93
94
95
96
97
98
99
100
101
102
103
104
105
|
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 93
def new_connection
Rack::OAuth2.debugging = true if @debug_http
client = OpenIDConnect::Client.new(
token_endpoint: @token_endpoint,
identifier: @service_client_identifier,
secret: @service_client_secret_key,
redirect_uri: 'http://localhost:8080/', host: @ingest_auth_host,
scheme: 'https'
)
client.access_token!(client_auth_method: 'other')
end
|
#prefer_buffer_processing ⇒ Object
36
37
38
|
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 36
def prefer_buffer_processing
true
end
|
#prepare_event_payload(tag, time, record) ⇒ Object
69
70
71
72
73
74
75
76
77
78
79
|
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 69
def prepare_event_payload(tag, time, record)
payload = super(tag, time, record)
payload[:attributes] = payload.delete(:fields) || {}
payload[:attributes][:index] = payload.delete(:index) if payload[:index]
payload[:body] = payload.delete(:event)
payload.delete(:time)
payload[:timestamp] = (time.to_f * 1000).to_i
payload[:nanos] = time.nsec / 100_000
payload
end
|
#process_response(response, request_body) ⇒ Object
81
82
83
84
85
86
87
88
89
90
91
|
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 81
def process_response(response, request_body)
super
if response.code.to_s == '401'
@conn = new_connection
raise 'Auth Error recived. New token has been fetched.'
elsif response.code.to_s == '429'
raise "Throttle error from server. #{response.body}"
elsif /INVALID_DATA/.match?(response.body)
log.error "#{self.class}: POST Body #{request_body}"
end
end
|
#write(chunk) ⇒ Object
44
45
46
|
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 44
def write(chunk)
super
end
|
#write_to_splunk(chunk) ⇒ Object
107
108
109
110
111
112
113
114
|
# File 'lib/fluent/plugin/out_splunk_ingest_api.rb', line 107
def write_to_splunk(chunk)
log.trace "#{self.class}: In write() with #{chunk.size} records and #{chunk.bytesize} bytes "
body = "[#{chunk.read.chomp(',')}]"
@conn ||= new_connection
response = @conn.post("https://#{@ingest_api_host}/#{@ingest_api_tenant}#{@ingest_api_events_endpoint}", body: body)
process_response(response, body)
end
|