Class: Flare::TraceExporter

Inherits:
Object
  • Object
show all
Defined in:
lib/flare/trace_exporter.rb

Overview

Custom OTel exporter. For each batch FilteringSpanProcessor hands over:

1. Group spans by trace_id.
2. For each trace, build a Flare::TraceBlob and gzip-JSON-encode it.
3. Check out a presigned R2 PUT URL from UploadUrlPool.
4. PUT the gzipped body straight to R2 -- Flare's server is NOT in
   the trace-bytes path.
5. After R2 returns 200, POST /api/traces { key } using the
   customer's push token + Flare-Project / Flare-Environment headers.
   That's the self-notify hop the design swapped in for the CF Worker.

403 from R2 means the presigned URL expired between issue and use; discard, check out the next URL, retry once. Pool empty -> FAILURE. Notify-POST failure is logged + counted but doesn’t fail the export (the blob is in R2, just won’t be processed; incoming/* lifecycle cleans it up in 1hr).

Constant Summary collapse

SUCCESS =
OpenTelemetry::SDK::Trace::Export::SUCCESS
FAILURE =
OpenTelemetry::SDK::Trace::Export::FAILURE
PUT_HEADERS =
{
  "Content-Type"     => "application/json",
  "Content-Encoding" => "gzip"
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pool:, notify_url:, api_key:, project:, environment:, transport: nil, logger: nil) ⇒ TraceExporter

Returns a new instance of TraceExporter.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/flare/trace_exporter.rb', line 43

def initialize(pool:, notify_url:, api_key:, project:, environment:,
               transport: nil, logger: nil)
  @pool         = pool
  @notify_url   = notify_url.to_s
  @api_key      = api_key
  @project      = project
  @environment  = environment
  @transport    = transport || HttpTransport.new
  @logger       = logger || Logger.new($stderr, level: Logger::WARN)

  @put_failure_count    = Concurrent::AtomicFixnum.new(0)
  @notify_failure_count = Concurrent::AtomicFixnum.new(0)
  @pool_empty_count     = Concurrent::AtomicFixnum.new(0)
  @exception_count      = Concurrent::AtomicFixnum.new(0)
end

Instance Attribute Details

#exception_countObject (readonly)

Returns the value of attribute exception_count.



41
42
43
# File 'lib/flare/trace_exporter.rb', line 41

def exception_count
  @exception_count
end

#notify_failure_countObject (readonly)

Returns the value of attribute notify_failure_count.



41
42
43
# File 'lib/flare/trace_exporter.rb', line 41

def notify_failure_count
  @notify_failure_count
end

#pool_empty_countObject (readonly)

Returns the value of attribute pool_empty_count.



41
42
43
# File 'lib/flare/trace_exporter.rb', line 41

def pool_empty_count
  @pool_empty_count
end

#put_failure_countObject (readonly)

Returns the value of attribute put_failure_count.



41
42
43
# File 'lib/flare/trace_exporter.rb', line 41

def put_failure_count
  @put_failure_count
end

Instance Method Details

#export(spans, timeout: nil) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/flare/trace_exporter.rb', line 59

def export(spans, timeout: nil)
  grouped = spans.group_by(&:trace_id)
  return SUCCESS if grouped.empty?

  overall = SUCCESS
  grouped.each do |trace_id, group|
    result = ship(TraceBlob.build(trace_id: trace_id, spans: group))
    overall = FAILURE if result == FAILURE
  end
  overall
rescue StandardError => e
  @exception_count.increment
  @logger.warn("[Flare::TraceExporter] export raised: #{e.class}: #{e.message}")
  FAILURE
end

#force_flush(timeout: nil) ⇒ Object



75
76
77
# File 'lib/flare/trace_exporter.rb', line 75

def force_flush(timeout: nil)
  SUCCESS
end

#shutdown(timeout: nil) ⇒ Object



79
80
81
# File 'lib/flare/trace_exporter.rb', line 79

def shutdown(timeout: nil)
  SUCCESS
end