55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/fluent/plugin/in_opentelemetry.rb', line 55
def start
super
if @http_config
http_handler = Opentelemetry::HttpInputHandler.new
http_server_create_http_server(:in_opentelemetry_http_server, addr: @http_config.bind, port: @http_config.port, logger: log) do |serv|
serv.post("/v1/logs") do |req|
http_handler.logs(req) { |record| router.emit(@tag, Fluent::EventTime.now, { type: Opentelemetry::RECORD_TYPE_LOGS, message: record }) }
end
serv.post("/v1/metrics") do |req|
http_handler.metrics(req) { |record| router.emit(@tag, Fluent::EventTime.now, { type: Opentelemetry::RECORD_TYPE_METRICS, message: record }) }
end
serv.post("/v1/traces") do |req|
http_handler.traces(req) { |record| router.emit(@tag, Fluent::EventTime.now, { type: Opentelemetry::RECORD_TYPE_TRACES, message: record }) }
end
end
end
if @grpc_config
thread_create(:in_opentelemetry_grpc_server) do
grpc_handler = Opentelemetry::GrpcInputHandler.new(@grpc_config, log)
grpc_handler.run(
logs: lambda { |record|
router.emit(@tag, Fluent::EventTime.now, { type: Opentelemetry::RECORD_TYPE_LOGS, message: record })
},
metrics: lambda { |record|
router.emit(@tag, Fluent::EventTime.now, { type: Opentelemetry::RECORD_TYPE_METRICS, message: record })
},
traces: lambda { |record|
router.emit(@tag, Fluent::EventTime.now, { type: Opentelemetry::RECORD_TYPE_TRACES, message: record })
}
)
end
end
end
|