Class: Fluent::Plugin::Opentelemetry::GrpcOutputHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/opentelemetry/grpc_output_handler.rb

Defined Under Namespace

Classes: ServiceStub

Instance Method Summary collapse

Constructor Details

#initialize(grpc_config, transport_config, logger) ⇒ GrpcOutputHandler

Returns a new instance of GrpcOutputHandler.



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/opentelemetry/grpc_output_handler.rb', line 49

def initialize(grpc_config, transport_config, logger)
  @grpc_config = grpc_config
  @transport_config = transport_config
  @logger = logger

  channel_args = {}
  channel_args = GRPC::Core::CompressionOptions.new({ default_algorithm: :gzip }).to_channel_arg_hash if @grpc_config.compress == :gzip
  channel_args["grpc.keepalive_time_ms"] = grpc_config.keepalive_time * 1000
  channel_args["grpc.keepalive_timeout_ms"] = grpc_config.keepalive_timeout * 1000
  channel_args["grpc.keepalive_permit_without_calls"] = 1
  @services = {
    Fluent::Plugin::Opentelemetry::RECORD_TYPE_LOGS => ServiceStub::Logs.new(@grpc_config.endpoint, :this_channel_is_insecure, channel_args: channel_args),
    Fluent::Plugin::Opentelemetry::RECORD_TYPE_METRICS => ServiceStub::Metrics.new(@grpc_config.endpoint, :this_channel_is_insecure, channel_args: channel_args),
    Fluent::Plugin::Opentelemetry::RECORD_TYPE_TRACES => ServiceStub::Traces.new(@grpc_config.endpoint, :this_channel_is_insecure, channel_args: channel_args)
  }
end

Instance Method Details

#export(record) ⇒ Object

Raises:

  • (::Fluent::UnrecoverableError)


66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/fluent/plugin/opentelemetry/grpc_output_handler.rb', line 66

def export(record)
  msg = record["message"]

  service = @services[record["type"]]
  raise ::Fluent::UnrecoverableError, "got unknown record type '#{record['type']}'" unless service

  begin
    service.export(msg, deadline: Time.now + @grpc_config.timeout)
  rescue Google::Protobuf::ParseError => e
    # The message format does not comply with the OpenTelemetry protocol.
    raise ::Fluent::UnrecoverableError, e.message
  end
end