Class: Fluent::Plugin::LokiOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::LokiOutput
- Defined in:
- lib/fluent/plugin/out_loki.rb
Overview
Subclass of Fluent Plugin Output
Defined Under Namespace
Classes: LogPostError
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
'memory'
Instance Attribute Summary collapse
-
#record_accessors ⇒ Object
Returns the value of attribute record_accessors.
Instance Method Summary collapse
-
#client_cert_configured? ⇒ Boolean
rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity.
-
#configure(conf) ⇒ Object
rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity.
- #generic_to_loki(chunk) ⇒ Object
- #http_request_opts(uri) ⇒ Object
- #load_client_cert ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #validate_client_cert_key ⇒ Object
-
#write(chunk) ⇒ Object
flush a chunk to loki.
Instance Attribute Details
#record_accessors ⇒ Object
Returns the value of attribute record_accessors.
34 35 36 |
# File 'lib/fluent/plugin/out_loki.rb', line 34 def record_accessors @record_accessors end |
Instance Method Details
#client_cert_configured? ⇒ Boolean
rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
131 132 133 |
# File 'lib/fluent/plugin/out_loki.rb', line 131 def client_cert_configured? !@key.nil? && !@cert.nil? end |
#configure(conf) ⇒ Object
rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/fluent/plugin/out_loki.rb', line 85 def configure(conf) compat_parameters_convert(conf, :buffer) super @uri = URI.parse("#{@url}/loki/api/v1/push") unless @uri.is_a?(URI::HTTP) || @uri.is_a?(URI::HTTPS) raise Fluent::ConfigError, 'URL parameter must have HTTP/HTTPS scheme' end @record_accessors = {} conf.elements.select { |element| element.name == 'label' }.each do |element| element.each_pair do |k, v| element.has_key?(k) # rubocop:disable Style/PreferredHashMethods #to suppress unread configuration warning v = k if v.empty? @record_accessors[k] = record_accessor_create(v) end end @remove_keys_accessors = [] @remove_keys.each do |key| @remove_keys_accessors.push(record_accessor_create(key)) end # If configured, load and validate client certificate (and corresponding key) if client_cert_configured? load_client_cert validate_client_cert_key end if !@bearer_token_file.nil? && !File.exist?(@bearer_token_file) raise "bearer_token_file #{@bearer_token_file} not found" end @auth_token_bearer = nil unless @bearer_token_file.nil? raise "bearer_token_file #{@bearer_token_file} not found" unless File.exist?(@bearer_token_file) # Read the file once, assume long-lived authentication token. @auth_token_bearer = File.read(@bearer_token_file) raise "bearer_token_file #{@bearer_token_file} is empty" if @auth_token_bearer.empty? log.info "will use Bearer token from bearer_token_file #{@bearer_token_file} in Authorization header" end raise "CA certificate file #{@ca_cert} not found" if !@ca_cert.nil? && !File.exist?(@ca_cert) end |
#generic_to_loki(chunk) ⇒ Object
204 205 206 207 208 |
# File 'lib/fluent/plugin/out_loki.rb', line 204 def generic_to_loki(chunk) # log.debug("GenericToLoki: converting #{chunk}") streams = chunk_to_loki(chunk) payload_builder(streams) end |
#http_request_opts(uri) ⇒ Object
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/fluent/plugin/out_loki.rb', line 174 def http_request_opts(uri) opts = { use_ssl: uri.scheme == 'https' } # Optionally disable server server certificate verification. if @insecure_tls opts = opts.merge( verify_mode: OpenSSL::SSL::VERIFY_NONE ) end # Optionally present client certificate if !@cert.nil? && !@key.nil? opts = opts.merge( cert: @cert, key: @key ) end # For server certificate verification: set custom CA bundle. # Only takes effect when `insecure_tls` is not set. unless @ca_cert.nil? opts = opts.merge( ca_file: @ca_cert ) end opts end |
#load_client_cert ⇒ Object
135 136 137 138 |
# File 'lib/fluent/plugin/out_loki.rb', line 135 def load_client_cert @cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert @key = OpenSSL::PKey.read(File.read(@key)) if @key end |
#multi_workers_ready? ⇒ Boolean
146 147 148 |
# File 'lib/fluent/plugin/out_loki.rb', line 146 def multi_workers_ready? true end |
#validate_client_cert_key ⇒ Object
140 141 142 143 144 |
# File 'lib/fluent/plugin/out_loki.rb', line 140 def validate_client_cert_key if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA) raise "Unsupported private key type #{key.class}" end end |
#write(chunk) ⇒ Object
flush a chunk to loki
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/fluent/plugin/out_loki.rb', line 151 def write(chunk) # streams by label payload = generic_to_loki(chunk) body = { 'streams' => payload } tenant = extract_placeholders(@tenant, chunk) if @tenant # add ingest path to loki url res = loki_http_request(body, tenant) if res.is_a?(Net::HTTPSuccess) log.debug "POST request was responded to with status code #{res.code}" return end res_summary = "#{res.code} #{res.} #{res.body}" log.warn "failed to write post to #{@uri} (#{res_summary})" log.debug Yajl.dump(body) # Only retry 429 and 500s raise(LogPostError, res_summary) if res.is_a?(Net::HTTPTooManyRequests) || res.is_a?(Net::HTTPServerError) end |