Class: Fluent::Plugin::Opentelemetry::GrpcInputHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/opentelemetry/grpc_input_handler.rb

Defined Under Namespace

Classes: ExceptionInterceptor, ServiceHandler

Instance Method Summary collapse

Constructor Details

#initialize(grpc_config, logger) ⇒ GrpcInputHandler

Returns a new instance of GrpcInputHandler.



52
53
54
55
# File 'lib/fluent/plugin/opentelemetry/grpc_input_handler.rb', line 52

def initialize(grpc_config, logger)
  @grpc_config = grpc_config
  @logger = logger
end

Instance Method Details

#run(logs:, metrics:, traces:) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/fluent/plugin/opentelemetry/grpc_input_handler.rb', line 57

def run(logs:, metrics:, traces:)
  @server = GRPC::RpcServer.new(interceptors: [ExceptionInterceptor.new])
  @server.add_http2_port("#{@grpc_config.bind}:#{@grpc_config.port}", :this_port_is_insecure)

  logs_handler = ServiceHandler::Logs.new
  logs_handler.callback = lambda { |request|
    logs.call(request.to_json)
    Fluent::Plugin::Opentelemetry::Response::Logs.build
  }
  @server.handle(logs_handler)

  metrics_handler = ServiceHandler::Metrics.new
  metrics_handler.callback = lambda { |request|
    metrics.call(request.to_json)
    Fluent::Plugin::Opentelemetry::Response::Metrics.build
  }
  @server.handle(metrics_handler)

  traces_handler = ServiceHandler::Traces.new
  traces_handler.callback = lambda { |request|
    traces.call(request.to_json)
    Fluent::Plugin::Opentelemetry::Response::Traces.build
  }
  @server.handle(traces_handler)

  @server.run_till_terminated
end

#stopObject



85
86
87
# File 'lib/fluent/plugin/opentelemetry/grpc_input_handler.rb', line 85

def stop
  @server.stop
end