Class: Fluent::Plugin::OpentelemetryOutput::BatchProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_opentelemetry.rb

Constant Summary collapse

RESOURCE_KEY_MAP =
{
  Opentelemetry::RECORD_TYPE_LOGS => "resourceLogs",
  Opentelemetry::RECORD_TYPE_METRICS => "resourceMetrics",
  Opentelemetry::RECORD_TYPE_TRACES => "resourceSpans"
}.freeze

Class Method Summary collapse

Class Method Details

.build_export_requests(chunk) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/fluent/plugin/out_opentelemetry.rb', line 109

def self.build_export_requests(chunk)
  requests = {
    Opentelemetry::RECORD_TYPE_LOGS => {},
    Opentelemetry::RECORD_TYPE_METRICS => {},
    Opentelemetry::RECORD_TYPE_TRACES => {}
  }

  chunk.each do |_, record| # rubocop:disable Style/HashEachMethods
    record_type = record["type"]
    resource_key = RESOURCE_KEY_MAP[record_type]
    record["message"] = JSON.parse(record["message"])
    resource_hash = record["message"][resource_key][0]["resource"].hash
    if requests[record_type][resource_hash].nil?
      requests[record_type][resource_hash] = record
    else
      requests[record_type][resource_hash]["message"][resource_key].concat(record["message"][resource_key])
    end
  end

  merged_records = requests.values.flat_map(&:values)
  merged_records.each do |record|
    record["message"] = record["message"].to_json
  end
  merged_records
end