Class: Fluent::Plugin::Opentelemetry::HttpOutputHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/opentelemetry/http_output_handler.rb

Instance Method Summary collapse

Constructor Details

#initialize(http_config, transport_config, logger) ⇒ HttpOutputHandler

Returns a new instance of HttpOutputHandler.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/fluent/plugin/opentelemetry/http_output_handler.rb', line 12

def initialize(http_config, transport_config, logger)
  @http_config = http_config
  @transport_config = transport_config
  @logger = logger

  tls_settings = {}
  if transport_config.protocol == :tls
    tls_settings[:client_cert] = @transport_config.cert_path
    tls_settings[:client_key] = @transport_config.private_key_path
    tls_settings[:client_key_pass] = @transport_config.private_key_passphrase
    tls_settings[:ssl_min_version] = Fluent::Plugin::Opentelemetry::TLS_VERSIONS_MAP[@transport_config.min_version]
    tls_settings[:ssl_max_version] = Fluent::Plugin::Opentelemetry::TLS_VERSIONS_MAP[@transport_config.max_version]
  end

  timeout_settings = {
    read_timeout: http_config.read_timeout,
    write_timeout: http_config.write_timeout,
    connect_timeout: http_config.connect_timeout
  }

  Excon.defaults[:ssl_verify_peer] = false if @transport_config.insecure
  @connections = {
    Fluent::Plugin::Opentelemetry::RECORD_TYPE_LOGS => Excon.new(http_logs_endpoint, proxy: @http_config.proxy, persistent: true, **tls_settings, **timeout_settings),
    Fluent::Plugin::Opentelemetry::RECORD_TYPE_METRICS => Excon.new(http_metrics_endpoint, proxy: @http_config.proxy, persistent: true, **tls_settings, **timeout_settings),
    Fluent::Plugin::Opentelemetry::RECORD_TYPE_TRACES => Excon.new(http_traces_endpoint, proxy: @http_config.proxy, persistent: true, **tls_settings, **timeout_settings)
  }
end

Instance Method Details

#export(record) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/fluent/plugin/opentelemetry/http_output_handler.rb', line 40

def export(record)
  uri, headers, body = get_post_data(record)
  response = @connections[record["type"]].post(body: body, headers: headers, idempotent: true)

  # Explicitly consume the response body to clear the socket buffer.
  # Without this, Excon retains the buffer, causing memory leaks and blocking persistent connections.
  response.body

  return if response.status >= 200 && response.status < 300

  if response.status == 400
    # The client MUST NOT retry the request when it receives HTTP 400 Bad Request response.
    raise Fluent::UnrecoverableError, "got unrecoverable error response from '#{uri}', response code is #{response.status}"
  end

  if @http_config.retryable_response_codes&.include?(response.status)
    raise Fluent::Plugin::OpentelemetryOutput::RetryableResponse, "got retryable error response from '#{uri}', response code is #{response.status}"
  end
  if @http_config.error_response_as_unrecoverable
    raise Fluent::UnrecoverableError, "got unrecoverable error response from '#{uri}', response code is #{response.status}"
  else
    @logger.error "got error response from '#{uri}', response code is #{response.status}"
  end
end