Class: Fluent::Plugin::Opentelemetry::GrpcOutputHandler
- Inherits:
-
Object
- Object
- Fluent::Plugin::Opentelemetry::GrpcOutputHandler
- Defined in:
- lib/fluent/plugin/opentelemetry/grpc_output_handler.rb
Defined Under Namespace
Classes: ServiceStub
Instance Method Summary collapse
- #export(record) ⇒ Object
-
#initialize(grpc_config, transport_config, logger) ⇒ GrpcOutputHandler
constructor
A new instance of GrpcOutputHandler.
Constructor Details
#initialize(grpc_config, transport_config, logger) ⇒ GrpcOutputHandler
Returns a new instance of GrpcOutputHandler.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/fluent/plugin/opentelemetry/grpc_output_handler.rb', line 49 def initialize(grpc_config, transport_config, logger) @grpc_config = grpc_config @transport_config = transport_config @logger = logger channel_args = {} channel_args = GRPC::Core::CompressionOptions.new({ default_algorithm: :gzip }).to_channel_arg_hash if @grpc_config.compress == :gzip channel_args["grpc.keepalive_time_ms"] = grpc_config.keepalive_time * 1000 channel_args["grpc.keepalive_timeout_ms"] = grpc_config.keepalive_timeout * 1000 channel_args["grpc.keepalive_permit_without_calls"] = 1 @services = { Fluent::Plugin::Opentelemetry::RECORD_TYPE_LOGS => ServiceStub::Logs.new(@grpc_config.endpoint, :this_channel_is_insecure, channel_args: channel_args), Fluent::Plugin::Opentelemetry::RECORD_TYPE_METRICS => ServiceStub::Metrics.new(@grpc_config.endpoint, :this_channel_is_insecure, channel_args: channel_args), Fluent::Plugin::Opentelemetry::RECORD_TYPE_TRACES => ServiceStub::Traces.new(@grpc_config.endpoint, :this_channel_is_insecure, channel_args: channel_args) } end |
Instance Method Details
#export(record) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/fluent/plugin/opentelemetry/grpc_output_handler.rb', line 66 def export(record) msg = record["message"] service = @services[record["type"]] raise ::Fluent::UnrecoverableError, "got unknown record type '#{record['type']}'" unless service begin service.export(msg, deadline: Time.now + @grpc_config.timeout) rescue Google::Protobuf::ParseError => e # The message format does not comply with the OpenTelemetry protocol. raise ::Fluent::UnrecoverableError, e. end end |