Class: Fluent::Plugin::OpentelemetryInput::GrpcHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_opentelemetry.rb

Defined Under Namespace

Classes: ExceptionInterceptor

Instance Method Summary collapse

Constructor Details

#initialize(grpc_config, logger) ⇒ GrpcHandler

Returns a new instance of GrpcHandler.



177
178
179
180
# File 'lib/fluent/plugin/in_opentelemetry.rb', line 177

def initialize(grpc_config, logger)
  @grpc_config = grpc_config
  @logger = logger
end

Instance Method Details

#run(logs:, metrics:, traces:) ⇒ Object



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/fluent/plugin/in_opentelemetry.rb', line 182

def run(logs:, metrics:, traces:)
  server = GRPC::RpcServer.new(interceptors: [ExceptionInterceptor.new])
  server.add_http2_port("#{@grpc_config.bind}:#{@grpc_config.port}", :this_port_is_insecure)

  logs_handler = Opentelemetry::ServiceHandler::Logs.new
  logs_handler.callback = lambda { |request|
    logs.call(request.to_json)
    Opentelemetry::Response::Logs.build
  }
  server.handle(logs_handler)

  metrics_handler = Opentelemetry::ServiceHandler::Metrics.new
  metrics_handler.callback = lambda { |request|
    metrics.call(request.to_json)
    Opentelemetry::Response::Metrics.build
  }
  server.handle(metrics_handler)

  traces_handler = Opentelemetry::ServiceHandler::Traces.new
  traces_handler.callback = lambda { |request|
    traces.call(request.to_json)
    Opentelemetry::Response::Traces.build
  }
  server.handle(traces_handler)

  server.run_till_terminated
end