Module: Langfuse::OtelSetup

Defined in:
lib/langfuse/otel_setup.rb

Overview

OpenTelemetry initialization and setup

Handles configuration of the OTel SDK with Langfuse OTLP exporter when tracing is enabled.

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.tracer_providerOpenTelemetry::SDK::Trace::TracerProvider? (readonly)

Returns The configured tracer provider.

Returns:

  • (OpenTelemetry::SDK::Trace::TracerProvider, nil)

    The configured tracer provider



17
18
19
# File 'lib/langfuse/otel_setup.rb', line 17

def tracer_provider
  @tracer_provider
end

Class Method Details

.force_flush(timeout: 30) ⇒ void

This method returns an undefined value.

Force flush all pending spans

Parameters:

  • timeout (Integer) (defaults to: 30)

    Timeout in seconds



96
97
98
99
100
# File 'lib/langfuse/otel_setup.rb', line 96

def force_flush(timeout: 30)
  return unless @tracer_provider

  @tracer_provider.force_flush(timeout: timeout)
end

.initialized?Boolean

Check if OTel is initialized

Returns:

  • (Boolean)


105
106
107
# File 'lib/langfuse/otel_setup.rb', line 105

def initialized?
  !@tracer_provider.nil?
end

.setup(config) ⇒ void

This method returns an undefined value.

Initialize OpenTelemetry with Langfuse OTLP exporter

rubocop:disable Metrics/AbcSize, Metrics/MethodLength

Parameters:



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/langfuse/otel_setup.rb', line 24

def setup(config)
  # Create OTLP exporter configured for Langfuse
  exporter = OpenTelemetry::Exporter::OTLP::Exporter.new(
    endpoint: "#{config.base_url}/api/public/otel/v1/traces",
    headers: build_headers(config.public_key, config.secret_key),
    compression: "gzip"
  )

  # Create processor based on async configuration
  # IMPORTANT: Always use BatchSpanProcessor (even in sync mode) to ensure spans
  # are exported together, which allows proper parent-child relationship detection
  processor = if config.tracing_async
                # Async: BatchSpanProcessor batches and sends in background
                OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor.new(
                  exporter,
                  max_queue_size: config.batch_size * 2, # Buffer more than batch_size
                  schedule_delay: config.flush_interval * 1000, # Convert seconds to milliseconds
                  max_export_batch_size: config.batch_size
                )
              else
                # Sync: BatchSpanProcessor with minimal delay (flushes on force_flush)
                # This collects spans from the same trace and exports them together,
                # which is critical for correct parent_observation_id calculation
                OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor.new(
                  exporter,
                  max_queue_size: config.batch_size * 2,
                  schedule_delay: 60_000, # 60 seconds (relies on explicit force_flush)
                  max_export_batch_size: config.batch_size
                )
              end

  # Create TracerProvider with processor
  @tracer_provider = OpenTelemetry::SDK::Trace::TracerProvider.new
  @tracer_provider.add_span_processor(processor)

  # Add span processor for propagated attributes and env/release defaults
  # This must be added AFTER the BatchSpanProcessor so it runs before export and can
  # apply all attributes (propagated IDs, environment, release) to the spans being sent
  span_processor = SpanProcessor.new(config: config)
  @tracer_provider.add_span_processor(span_processor)

  # Set as global tracer provider
  OpenTelemetry.tracer_provider = @tracer_provider

  # Configure W3C TraceContext propagator if not already set
  if OpenTelemetry.propagation.is_a?(OpenTelemetry::Context::Propagation::NoopTextMapPropagator)
    OpenTelemetry.propagation = OpenTelemetry::Trace::Propagation::TraceContext::TextMapPropagator.new
    config.logger.debug("Langfuse: Configured W3C TraceContext propagator")
  else
    config.logger.debug("Langfuse: Using existing propagator: #{OpenTelemetry.propagation.class}")
  end

  mode = config.tracing_async ? "async" : "sync"
  config.logger.info("Langfuse tracing initialized with OpenTelemetry (#{mode} mode)")
end

.shutdown(timeout: 30) ⇒ void

This method returns an undefined value.

Shutdown the tracer provider and flush any pending spans

Parameters:

  • timeout (Integer) (defaults to: 30)

    Timeout in seconds



85
86
87
88
89
90
# File 'lib/langfuse/otel_setup.rb', line 85

def shutdown(timeout: 30)
  return unless @tracer_provider

  @tracer_provider.shutdown(timeout: timeout)
  @tracer_provider = nil
end