57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
# File 'lib/fluent/plugin/in_opentelemetry.rb', line 57
def start
super
if @http_config
http_handler = Opentelemetry::HttpInputHandler.new
http_server_create_http_server(:in_opentelemetry_http_server, addr: @http_config.bind, port: @http_config.port, logger: log) do |serv|
serv.post("/v1/logs") do |req|
http_handler.logs(req) { |record| router.emit(tag_for(Opentelemetry::RECORD_TYPE_LOGS), Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_LOGS, "message" => record }) }
end
serv.post("/v1/metrics") do |req|
http_handler.metrics(req) { |record| router.emit(tag_for(Opentelemetry::RECORD_TYPE_METRICS), Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_METRICS, "message" => record }) }
end
serv.post("/v1/traces") do |req|
http_handler.traces(req) { |record| router.emit(tag_for(Opentelemetry::RECORD_TYPE_TRACES), Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_TRACES, "message" => record }) }
end
end
end
if @grpc_config
thread_create(:in_opentelemetry_grpc_server) do
@grpc_handler = Opentelemetry::GrpcInputHandler.new(@grpc_config, log)
@grpc_handler.run(
logs: lambda { |record|
router.emit(tag_for(Opentelemetry::RECORD_TYPE_LOGS), Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_LOGS, "message" => record })
},
metrics: lambda { |record|
router.emit(tag_for(Opentelemetry::RECORD_TYPE_METRICS), Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_METRICS, "message" => record })
},
traces: lambda { |record|
router.emit(tag_for(Opentelemetry::RECORD_TYPE_TRACES), Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_TRACES, "message" => record })
}
)
end
end
end
|