Class: Fluent::Plugin::Opentelemetry::GrpcInputHandler
- Inherits:
-
Object
- Object
- Fluent::Plugin::Opentelemetry::GrpcInputHandler
- Defined in:
- lib/fluent/plugin/opentelemetry/grpc_input_handler.rb
Defined Under Namespace
Classes: ExceptionInterceptor, ServiceHandler
Instance Method Summary collapse
-
#initialize(grpc_config, logger) ⇒ GrpcInputHandler
constructor
A new instance of GrpcInputHandler.
- #run(logs:, metrics:, traces:) ⇒ Object
Constructor Details
#initialize(grpc_config, logger) ⇒ GrpcInputHandler
Returns a new instance of GrpcInputHandler.
52 53 54 55 |
# File 'lib/fluent/plugin/opentelemetry/grpc_input_handler.rb', line 52 def initialize(grpc_config, logger) @grpc_config = grpc_config @logger = logger end |
Instance Method Details
#run(logs:, metrics:, traces:) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/fluent/plugin/opentelemetry/grpc_input_handler.rb', line 57 def run(logs:, metrics:, traces:) server = GRPC::RpcServer.new(interceptors: [ExceptionInterceptor.new]) server.add_http2_port("#{@grpc_config.bind}:#{@grpc_config.port}", :this_port_is_insecure) logs_handler = ServiceHandler::Logs.new logs_handler.callback = lambda { |request| logs.call(request.to_json) Fluent::Plugin::Opentelemetry::Response::Logs.build } server.handle(logs_handler) metrics_handler = ServiceHandler::Metrics.new metrics_handler.callback = lambda { |request| metrics.call(request.to_json) Fluent::Plugin::Opentelemetry::Response::Metrics.build } server.handle(metrics_handler) traces_handler = ServiceHandler::Traces.new traces_handler.callback = lambda { |request| traces.call(request.to_json) Fluent::Plugin::Opentelemetry::Response::Traces.build } server.handle(traces_handler) server.run_till_terminated end |