Class: Flare::FilteringSpanProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/flare/filtering_span_processor.rb

Overview

BSP-shaped span processor whose filter is ‘sampled OR marked` instead of BSP’s ‘sampled` (BSP early-returns on RECORD_ONLY spans – our Path 2 spans have sampled=false so they’d be dropped). Forwards matching spans to a trace exporter on a background worker thread so the exporter never runs on the request/job thread (CAF-3).

On every on_finish we also check marker.owner?(trace_id, span_id) and unmark when the owning rack span finishes (CAF-2). Cleanup runs even for spans we don’t export.

Constant Summary collapse

SUCCESS =
OpenTelemetry::SDK::Trace::Export::SUCCESS
FAILURE =
OpenTelemetry::SDK::Trace::Export::FAILURE
DEFAULT_MAX_QUEUE =
5_000
DEFAULT_FLUSH_INTERVAL =

seconds

5
DEFAULT_EXPORT_TIMEOUT =

seconds

30
DEFAULT_MARKED_TRACE_GRACE_PERIOD =

seconds

1.0

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(exporter:, marker:, max_queue: DEFAULT_MAX_QUEUE, flush_interval: DEFAULT_FLUSH_INTERVAL, export_timeout: DEFAULT_EXPORT_TIMEOUT, marked_trace_grace_period: DEFAULT_MARKED_TRACE_GRACE_PERIOD, logger: nil) ⇒ FilteringSpanProcessor

Returns a new instance of FilteringSpanProcessor.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/flare/filtering_span_processor.rb', line 28

def initialize(exporter:, marker:,
               max_queue: DEFAULT_MAX_QUEUE,
               flush_interval: DEFAULT_FLUSH_INTERVAL,
               export_timeout: DEFAULT_EXPORT_TIMEOUT,
               marked_trace_grace_period: DEFAULT_MARKED_TRACE_GRACE_PERIOD,
               logger: nil)
  @exporter       = exporter
  @marker         = marker
  @max_queue      = max_queue
  @flush_interval = flush_interval
  @export_timeout = export_timeout
  @marked_trace_grace_period = marked_trace_grace_period.to_f
  @logger         = logger || Logger.new($stderr, level: Logger::WARN)

  @pending_by_trace = {}
  @trace_order      = []
  @pending_count    = 0
  @ready_queue      = []
  @delayed_ready_by_trace = {}
  @mutex            = Mutex.new
  @cond             = ConditionVariable.new
  @stopped          = false
  @pid              = $$

  @dropped_count       = Concurrent::AtomicFixnum.new(0)
  @failed_export_count = Concurrent::AtomicFixnum.new(0)
  @exception_count     = Concurrent::AtomicFixnum.new(0)
  @buffer_high_watermark = Concurrent::AtomicFixnum.new(0)

  start_worker
end

Instance Attribute Details

#buffer_high_watermarkObject (readonly)

Returns the value of attribute buffer_high_watermark.



26
27
28
# File 'lib/flare/filtering_span_processor.rb', line 26

def buffer_high_watermark
  @buffer_high_watermark
end

#dropped_countObject (readonly)

Returns the value of attribute dropped_count.



26
27
28
# File 'lib/flare/filtering_span_processor.rb', line 26

def dropped_count
  @dropped_count
end

#exception_countObject (readonly)

Returns the value of attribute exception_count.



26
27
28
# File 'lib/flare/filtering_span_processor.rb', line 26

def exception_count
  @exception_count
end

#failed_export_countObject (readonly)

Returns the value of attribute failed_export_count.



26
27
28
# File 'lib/flare/filtering_span_processor.rb', line 26

def failed_export_count
  @failed_export_count
end

#max_queueObject (readonly)

Returns the value of attribute max_queue.



26
27
28
# File 'lib/flare/filtering_span_processor.rb', line 26

def max_queue
  @max_queue
end

Instance Method Details

#buffer_sizeObject



96
97
98
# File 'lib/flare/filtering_span_processor.rb', line 96

def buffer_size
  @mutex.synchronize { queued_span_count }
end

#force_flush(timeout: nil) ⇒ Object



80
81
82
83
# File 'lib/flare/filtering_span_processor.rb', line 80

def force_flush(timeout: nil)
  drain_and_export(include_pending: true)
  SUCCESS
end

#on_finish(span) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/flare/filtering_span_processor.rb', line 62

def on_finish(span)
  detect_forking

  ctx = span.context
  sampled = ctx&.trace_flags&.sampled?
  marked  = ctx && @marker.marked?(ctx.trace_id)
  owner_finished = marked && @marker.owner?(ctx.trace_id, ctx.span_id)

  return unless sampled || marked

  span_data = span.respond_to?(:to_span_data) ? span.to_span_data : span
  enqueue(
    span_data,
    complete: owner_finished || sampled_completion_span?(span_data),
    delay: owner_finished ? @marked_trace_grace_period : 0
  )
end

#on_start(_span, _parent_context) ⇒ Object



60
# File 'lib/flare/filtering_span_processor.rb', line 60

def on_start(_span, _parent_context); end

#reset_buffer_high_watermarkObject



100
101
102
# File 'lib/flare/filtering_span_processor.rb', line 100

def reset_buffer_high_watermark
  @buffer_high_watermark.value = buffer_size
end

#shutdown(timeout: nil) ⇒ Object



85
86
87
88
89
90
91
92
93
94
# File 'lib/flare/filtering_span_processor.rb', line 85

def shutdown(timeout: nil)
  @mutex.synchronize do
    @stopped = true
    @cond.broadcast
  end
  @worker.join(timeout || 5)
  drain_and_export(include_pending: true)
  @exporter.shutdown(timeout: timeout) if @exporter.respond_to?(:shutdown)
  SUCCESS
end