Class: Flare::FilteringSpanProcessor
- Inherits:
-
Object
- Object
- Flare::FilteringSpanProcessor
- Defined in:
- lib/flare/filtering_span_processor.rb
Overview
BSP-shaped span processor whose filter is ‘sampled OR marked` instead of BSP’s ‘sampled` (BSP early-returns on RECORD_ONLY spans – our Path 2 spans have sampled=false so they’d be dropped). Forwards matching spans to a trace exporter on a background worker thread so the exporter never runs on the request/job thread (CAF-3).
On every on_finish we also check marker.owner?(trace_id, span_id) and unmark when the owning rack span finishes (CAF-2). Cleanup runs even for spans we don’t export.
Constant Summary collapse
- SUCCESS =
OpenTelemetry::SDK::Trace::Export::SUCCESS
- FAILURE =
OpenTelemetry::SDK::Trace::Export::FAILURE
- DEFAULT_MAX_QUEUE =
5_000- DEFAULT_FLUSH_INTERVAL =
seconds
5- DEFAULT_EXPORT_TIMEOUT =
seconds
30- DEFAULT_MARKED_TRACE_GRACE_PERIOD =
seconds
1.0
Instance Attribute Summary collapse
-
#buffer_high_watermark ⇒ Object
readonly
Returns the value of attribute buffer_high_watermark.
-
#dropped_count ⇒ Object
readonly
Returns the value of attribute dropped_count.
-
#exception_count ⇒ Object
readonly
Returns the value of attribute exception_count.
-
#failed_export_count ⇒ Object
readonly
Returns the value of attribute failed_export_count.
-
#max_queue ⇒ Object
readonly
Returns the value of attribute max_queue.
Instance Method Summary collapse
- #buffer_size ⇒ Object
- #force_flush(timeout: nil) ⇒ Object
-
#initialize(exporter:, marker:, max_queue: DEFAULT_MAX_QUEUE, flush_interval: DEFAULT_FLUSH_INTERVAL, export_timeout: DEFAULT_EXPORT_TIMEOUT, marked_trace_grace_period: DEFAULT_MARKED_TRACE_GRACE_PERIOD, logger: nil) ⇒ FilteringSpanProcessor
constructor
A new instance of FilteringSpanProcessor.
- #on_finish(span) ⇒ Object
- #on_start(_span, _parent_context) ⇒ Object
- #reset_buffer_high_watermark ⇒ Object
- #shutdown(timeout: nil) ⇒ Object
Constructor Details
#initialize(exporter:, marker:, max_queue: DEFAULT_MAX_QUEUE, flush_interval: DEFAULT_FLUSH_INTERVAL, export_timeout: DEFAULT_EXPORT_TIMEOUT, marked_trace_grace_period: DEFAULT_MARKED_TRACE_GRACE_PERIOD, logger: nil) ⇒ FilteringSpanProcessor
Returns a new instance of FilteringSpanProcessor.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/flare/filtering_span_processor.rb', line 28 def initialize(exporter:, marker:, max_queue: DEFAULT_MAX_QUEUE, flush_interval: DEFAULT_FLUSH_INTERVAL, export_timeout: DEFAULT_EXPORT_TIMEOUT, marked_trace_grace_period: DEFAULT_MARKED_TRACE_GRACE_PERIOD, logger: nil) @exporter = exporter @marker = marker @max_queue = max_queue @flush_interval = flush_interval @export_timeout = export_timeout @marked_trace_grace_period = marked_trace_grace_period.to_f @logger = logger || Logger.new($stderr, level: Logger::WARN) @pending_by_trace = {} @trace_order = [] @pending_count = 0 @ready_queue = [] @delayed_ready_by_trace = {} @mutex = Mutex.new @cond = ConditionVariable.new @stopped = false @pid = $$ @dropped_count = Concurrent::AtomicFixnum.new(0) @failed_export_count = Concurrent::AtomicFixnum.new(0) @exception_count = Concurrent::AtomicFixnum.new(0) @buffer_high_watermark = Concurrent::AtomicFixnum.new(0) start_worker end |
Instance Attribute Details
#buffer_high_watermark ⇒ Object (readonly)
Returns the value of attribute buffer_high_watermark.
26 27 28 |
# File 'lib/flare/filtering_span_processor.rb', line 26 def buffer_high_watermark @buffer_high_watermark end |
#dropped_count ⇒ Object (readonly)
Returns the value of attribute dropped_count.
26 27 28 |
# File 'lib/flare/filtering_span_processor.rb', line 26 def dropped_count @dropped_count end |
#exception_count ⇒ Object (readonly)
Returns the value of attribute exception_count.
26 27 28 |
# File 'lib/flare/filtering_span_processor.rb', line 26 def exception_count @exception_count end |
#failed_export_count ⇒ Object (readonly)
Returns the value of attribute failed_export_count.
26 27 28 |
# File 'lib/flare/filtering_span_processor.rb', line 26 def failed_export_count @failed_export_count end |
#max_queue ⇒ Object (readonly)
Returns the value of attribute max_queue.
26 27 28 |
# File 'lib/flare/filtering_span_processor.rb', line 26 def max_queue @max_queue end |
Instance Method Details
#buffer_size ⇒ Object
96 97 98 |
# File 'lib/flare/filtering_span_processor.rb', line 96 def buffer_size @mutex.synchronize { queued_span_count } end |
#force_flush(timeout: nil) ⇒ Object
80 81 82 83 |
# File 'lib/flare/filtering_span_processor.rb', line 80 def force_flush(timeout: nil) drain_and_export(include_pending: true) SUCCESS end |
#on_finish(span) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/flare/filtering_span_processor.rb', line 62 def on_finish(span) detect_forking ctx = span.context sampled = ctx&.trace_flags&.sampled? marked = ctx && @marker.marked?(ctx.trace_id) owner_finished = marked && @marker.owner?(ctx.trace_id, ctx.span_id) return unless sampled || marked span_data = span.respond_to?(:to_span_data) ? span.to_span_data : span enqueue( span_data, complete: owner_finished || sampled_completion_span?(span_data), delay: owner_finished ? @marked_trace_grace_period : 0 ) end |
#on_start(_span, _parent_context) ⇒ Object
60 |
# File 'lib/flare/filtering_span_processor.rb', line 60 def on_start(_span, _parent_context); end |
#reset_buffer_high_watermark ⇒ Object
100 101 102 |
# File 'lib/flare/filtering_span_processor.rb', line 100 def reset_buffer_high_watermark @buffer_high_watermark.value = buffer_size end |
#shutdown(timeout: nil) ⇒ Object
85 86 87 88 89 90 91 92 93 94 |
# File 'lib/flare/filtering_span_processor.rb', line 85 def shutdown(timeout: nil) @mutex.synchronize do @stopped = true @cond.broadcast end @worker.join(timeout || 5) drain_and_export(include_pending: true) @exporter.shutdown(timeout: timeout) if @exporter.respond_to?(:shutdown) SUCCESS end |