Module: Langfuse::Propagation

Defined in:
lib/langfuse/propagation.rb

Overview

Attribute propagation utilities for Langfuse OpenTelemetry integration.

This module provides the ‘propagate_attributes` method for setting trace-level attributes (user_id, session_id, metadata) that automatically propagate to all child spans within the context.

rubocop:disable Metrics/ModuleLength

Examples:

Basic usage

Langfuse.observe("operation") do |span|
  Langfuse.propagate_attributes(user_id: "user_123", session_id: "session_abc") do
    # Current span has user_id and session_id
    span.start_observation("child") do |child|
      # Child span inherits user_id and session_id
    end
  end
end

Constant Summary collapse

SPAN_KEY_MAP =

Map of propagated attribute keys to span attribute keys

{
  "user_id" => OtelAttributes::TRACE_USER_ID,
  "session_id" => OtelAttributes::TRACE_SESSION_ID,
  "version" => OtelAttributes::VERSION,
  "tags" => OtelAttributes::TRACE_TAGS,
  "metadata" => OtelAttributes::TRACE_METADATA
}.freeze
CONTEXT_KEYS =

OpenTelemetry context keys for propagated attributes

{
  "user_id" => OpenTelemetry::Context.create_key("langfuse_user_id"),
  "session_id" => OpenTelemetry::Context.create_key("langfuse_session_id"),
  "metadata" => OpenTelemetry::Context.create_key("langfuse_metadata"),
  "version" => OpenTelemetry::Context.create_key("langfuse_version"),
  "tags" => OpenTelemetry::Context.create_key("langfuse_tags")
}.freeze
PROPAGATED_ATTRIBUTES =

List of propagated attribute keys (derived from CONTEXT_KEYS)

CONTEXT_KEYS.keys.freeze
BAGGAGE_PREFIX =

Baggage key prefix for cross-service propagation

"langfuse_"

Class Method Summary collapse

Class Method Details

._extract_baggage_attributes(context) ⇒ Hash<String, String, Array<String>>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Extract propagated attributes from baggage

Parameters:

  • context (OpenTelemetry::Context)

    The context to read baggage from

Returns:

  • (Hash<String, String, Array<String>>)

    Hash of span key => value



393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
# File 'lib/langfuse/propagation.rb', line 393

def self._extract_baggage_attributes(context)
  return {} unless baggage_available?

  baggage = OpenTelemetry::Baggage.values(context: context)
  return {} unless baggage.is_a?(Hash)

  attributes = {}
  baggage.each do |baggage_key, baggage_value|
    next unless baggage_key.to_s.start_with?(BAGGAGE_PREFIX)

    span_key = _get_span_key_from_baggage_key(baggage_key.to_s)
    next unless span_key

    attributes[span_key] = _parse_baggage_value(span_key, baggage_value)
  end
  attributes
rescue StandardError => e
  Langfuse.configuration.logger.debug("Langfuse: Baggage extraction failed: #{e.message}")
  {}
end

._get_propagated_baggage_key(key) ⇒ String

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get baggage key for a propagated attribute

Parameters:

  • key (String)

    Attribute key (user_id, session_id, etc.)

Returns:

  • (String)

    Baggage key (snake_case for cross-service compatibility)



354
355
356
# File 'lib/langfuse/propagation.rb', line 354

def self._get_propagated_baggage_key(key)
  "#{BAGGAGE_PREFIX}#{key}"
end

._get_propagated_context_key(key) ⇒ OpenTelemetry::Context::Key

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get context key for a propagated attribute

Parameters:

  • key (String)

    Attribute key (user_id, session_id, etc.)

Returns:

  • (OpenTelemetry::Context::Key)

    Context key object



334
335
336
# File 'lib/langfuse/propagation.rb', line 334

def self._get_propagated_context_key(key)
  CONTEXT_KEYS[key] || raise(ArgumentError, "Unknown propagated attribute key: #{key}")
end

._get_propagated_span_key(key) ⇒ String

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get span attribute key for a propagated attribute

Parameters:

  • key (String)

    Attribute key (user_id, session_id, etc.)

Returns:

  • (String)

    Span attribute key



344
345
346
# File 'lib/langfuse/propagation.rb', line 344

def self._get_propagated_span_key(key)
  SPAN_KEY_MAP[key] || "#{OtelAttributes::TRACE_METADATA}.#{key}"
end

._get_span_key_from_baggage_key(baggage_key) ⇒ String?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get span key from baggage key

Parameters:

  • baggage_key (String)

    Baggage key

Returns:

  • (String, nil)

    Span key or nil if not a Langfuse baggage key



364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/langfuse/propagation.rb', line 364

def self._get_span_key_from_baggage_key(baggage_key)
  return nil unless baggage_key.start_with?(BAGGAGE_PREFIX)

  suffix = baggage_key[BAGGAGE_PREFIX.length..]

  # Handle metadata keys (format: langfuse_metadata_{key_name})
  if suffix.start_with?("metadata_")
     = suffix[("metadata_".length)..]
    return "#{OtelAttributes::TRACE_METADATA}.#{}"
  end

  SPAN_KEY_MAP[suffix]
end

._merge_metadata(context, context_key, new_metadata) ⇒ Hash<String, String>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Merge metadata with existing context value

Parameters:

  • context (OpenTelemetry::Context)

    Current context

  • context_key (OpenTelemetry::Context::Key)

    Context key for metadata

  • new_metadata (Hash<String, String>)

    New metadata to merge

Returns:

  • (Hash<String, String>)

    Merged metadata



194
195
196
197
198
# File 'lib/langfuse/propagation.rb', line 194

def self.(context, context_key, )
  existing = context.value(context_key) || {}
  existing = existing.to_h if existing.respond_to?(:to_h)
  existing.merge()
end

._merge_tags(context, context_key, new_tags) ⇒ Array<String>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Merge tags with existing context value

Parameters:

  • context (OpenTelemetry::Context)

    Current context

  • context_key (OpenTelemetry::Context::Key)

    Context key for tags

  • new_tags (Array<String>)

    New tags to merge

Returns:

  • (Array<String>)

    Merged tags (deduplicated)



208
209
210
211
212
# File 'lib/langfuse/propagation.rb', line 208

def self._merge_tags(context, context_key, new_tags)
  existing = context.value(context_key) || []
  existing = existing.to_a if existing.respond_to?(:to_a)
  (existing + new_tags).uniq
end

._parse_baggage_value(span_key, baggage_value) ⇒ String+

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Parse a baggage value into the appropriate format

Parameters:

  • span_key (String)

    The span attribute key

  • baggage_value (String, Object)

    The baggage value

Returns:

  • (String, Array<String>)

    Parsed value



421
422
423
424
425
426
427
# File 'lib/langfuse/propagation.rb', line 421

def self._parse_baggage_value(span_key, baggage_value)
  if span_key == OtelAttributes::TRACE_TAGS && baggage_value.is_a?(String)
    baggage_value.split(",")
  else
    baggage_value.to_s
  end
end

._propagate_attributes(user_id: nil, session_id: nil, metadata: nil, version: nil, tags: nil, as_baggage: false) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Internal implementation of propagate_attributes



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/langfuse/propagation.rb', line 96

def self._propagate_attributes(user_id: nil, session_id: nil, metadata: nil, version: nil, tags: nil,
                               as_baggage: false, &)
  current_context = OpenTelemetry::Context.current
  current_span = OpenTelemetry::Trace.current_span

  # Process each propagated attribute using PROPAGATED_ATTRIBUTES constant
  PROPAGATED_ATTRIBUTES.each do |key|
    value = binding.local_variable_get(key.to_sym)
    next if value.nil?
    next if key == "tags" && value.empty?

    validated_value = _validate_attribute_value(key, value)
    next unless validated_value

    current_context = _set_propagated_attribute(
      key: key,
      value: validated_value,
      context: current_context,
      span: current_span,
      as_baggage: as_baggage
    )
  end

  # Execute block in new context
  OpenTelemetry::Context.with_current(current_context, &)
end

._set_baggage_attribute(context:, key:, value:, baggage_key:) ⇒ OpenTelemetry::Context

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Set a propagated attribute in baggage

rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

Parameters:

  • context (OpenTelemetry::Context)

    Current context

  • key (String)

    Attribute key (user_id, session_id, version, tags, metadata)

  • value (String, Array<String>, Hash<String, String>)

    Attribute value

  • baggage_key (String)

    Baggage key prefix

Returns:

  • (OpenTelemetry::Context)

    New context with baggage set



439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
# File 'lib/langfuse/propagation.rb', line 439

def self._set_baggage_attribute(context:, key:, value:, baggage_key:)
  return context unless baggage_available?

  if key == "metadata" && value.is_a?(Hash)
    value.each do |k, v|
      entry_key = "#{baggage_key}_#{k}"
      context = OpenTelemetry::Baggage.set_value(entry_key, v.to_s, context: context)
    end
  elsif key == "tags" && value.is_a?(Array)
    context = OpenTelemetry::Baggage.set_value(baggage_key, value.join(","), context: context)
  else
    context = OpenTelemetry::Baggage.set_value(baggage_key, value.to_s, context: context)
  end
  context
rescue StandardError => e
  Langfuse.configuration.logger.warn("Langfuse: Failed to set baggage: #{e.message}")
  context
end

._set_propagated_attribute(key:, value:, context:, span:, as_baggage:) ⇒ OpenTelemetry::Context

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Set a propagated attribute in context and on current span

rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity

Parameters:

  • key (String)

    Attribute key (user_id, session_id, version, tags, metadata)

  • value (String, Array<String>, Hash<String, String>)

    Attribute value

  • context (OpenTelemetry::Context)

    Current context

  • span (OpenTelemetry::Trace::Span, nil)

    Current span (may be nil)

  • as_baggage (Boolean)

    Whether to set in baggage

Returns:

  • (OpenTelemetry::Context)

    New context with attribute set



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/langfuse/propagation.rb', line 225

def self._set_propagated_attribute(key:, value:, context:, span:, as_baggage:)
  context_key = _get_propagated_context_key(key)
  span_key = _get_propagated_span_key(key)
  baggage_key = _get_propagated_baggage_key(key)

  # Merge metadata/tags with existing context values
  value = if key == "metadata" && value.is_a?(Hash)
            (context, context_key, value)
          elsif key == "tags" && value.is_a?(Array)
            _merge_tags(context, context_key, value)
          else
            value
          end

  # Set in context
  context = context.set_value(context_key, value)

  # Set on current span (if recording)
  if span&.recording?
    if key == "metadata" && value.is_a?(Hash)
      # Handle metadata - flatten into individual attributes
      value.each do |k, v|
         = "#{OtelAttributes::TRACE_METADATA}.#{k}"
        span.set_attribute(, v.to_s)
      end
    elsif key == "tags" && value.is_a?(Array)
      # Handle tags - serialize as JSON array
      serialized_tags = OtelAttributes.serialize(value)
      span.set_attribute(span_key, serialized_tags) if serialized_tags
    else
      span.set_attribute(span_key, value.to_s)
    end
  end

  # Set in baggage (if requested and available)
  # Note: Baggage support requires opentelemetry-baggage gem
  if as_baggage
    unless baggage_available?
      Langfuse.configuration.logger.warn(
        "Langfuse: Baggage propagation requested but opentelemetry-baggage gem not available. " \
        "Install opentelemetry-baggage for cross-service propagation."
      )
    end

    context = _set_baggage_attribute(
      context: context,
      key: key,
      value: value,
      baggage_key: baggage_key
    )
  end

  context
end

._validate_attribute_value(key, value) ⇒ Object?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Validate an attribute value based on its type

rubocop:disable Metrics/CyclomaticComplexity

Parameters:

  • key (String)

    Attribute key

  • value (Object)

    Attribute value

Returns:

  • (Object, nil)

    Validated value or nil if invalid



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/langfuse/propagation.rb', line 131

def self._validate_attribute_value(key, value)
  case key
  when "tags"
    validated_tags = value.filter_map { |tag| _validate_propagated_value(tag, "tag") }
    validated_tags.any? ? validated_tags : nil
  when "metadata"
     = {}
    value.each do |k, v|
      [k.to_s] = v.to_s if _validate_string_value(v, "metadata.#{k}")
    end
    .any? ?  : nil
  else
    _validate_propagated_value(value, key)
  end
end

._validate_propagated_value(value, key) ⇒ String, ...

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Validate a propagated value (string or array of strings)

Parameters:

  • value (String, Array<String>)

    Value to validate

  • key (String)

    Attribute key for error messages

Returns:

  • (String, Array<String>, nil)

    Validated value or nil if invalid



288
289
290
291
292
293
294
295
296
297
298
# File 'lib/langfuse/propagation.rb', line 288

def self._validate_propagated_value(value, key)
  if value.is_a?(Array)
    validated = value.filter_map { |v| _validate_string_value(v, key) ? v : nil }
    return validated.any? ? validated : nil
  end

  # Validate string value (will log warning if not a string)
  return nil unless _validate_string_value(value, key)

  value
end

._validate_string_value(value, key) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Validate a string value

rubocop:disable Naming/PredicateMethod

Parameters:

  • value (String)

    Value to validate

  • key (String)

    Attribute key for error messages

Returns:

  • (Boolean)

    True if valid, false otherwise



308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/langfuse/propagation.rb', line 308

def self._validate_string_value(value, key)
  unless value.is_a?(String)
    Langfuse.configuration.logger.warn(
      "Langfuse: Propagated attribute '#{key}' value is not a string. Dropping value."
    )
    return false
  end

  if value.length > 200
    Langfuse.configuration.logger.warn(
      "Langfuse: Propagated attribute '#{key}' value is over 200 characters " \
      "(#{value.length} chars). Dropping value."
    )
    return false
  end

  true
end

.baggage_available?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Check if baggage API is available

Returns:

  • (Boolean)

    True if OpenTelemetry::Baggage is defined



383
384
385
# File 'lib/langfuse/propagation.rb', line 383

def self.baggage_available?
  defined?(OpenTelemetry::Baggage)
end

.get_propagated_attributes_from_context(context) ⇒ Hash<String, String, Array<String>>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get propagated attributes from context for span processor

rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

Parameters:

  • context (OpenTelemetry::Context)

    The context to read from

Returns:

  • (Hash<String, String, Array<String>>)

    Hash of span key => value



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/langfuse/propagation.rb', line 155

def self.get_propagated_attributes_from_context(context)
  propagated_attributes = _extract_baggage_attributes(context)

  # Handle OTEL context values
  PROPAGATED_ATTRIBUTES.each do |key|
    context_key = _get_propagated_context_key(key)
    value = context.value(context_key)

    next if value.nil?

    span_key = _get_propagated_span_key(key)

    if key == "metadata" && value.is_a?(Hash)
      # Handle metadata - flatten into individual attributes
      value.each do |k, v|
         = "#{OtelAttributes::TRACE_METADATA}.#{k}"
        propagated_attributes[] = v.to_s
      end
    elsif key == "tags" && value.is_a?(Array)
      # Handle tags - serialize as JSON array for span attributes
      serialized_tags = OtelAttributes.serialize(value)
      propagated_attributes[span_key] = serialized_tags if serialized_tags
    else
      propagated_attributes[span_key] = value.to_s
    end
  end
  # rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

  propagated_attributes
end

.propagate_attributes(user_id: nil, session_id: nil, metadata: nil, version: nil, tags: nil, as_baggage: false) { ... } ⇒ Object

Propagate trace-level attributes to all spans created within this context.

This method sets attributes on the currently active span AND automatically propagates them to all new child spans created within the block. This is the recommended way to set trace-level attributes like user_id, session_id, and metadata dimensions that should be consistently applied across all observations in a trace.

Examples:

Basic usage

Langfuse.propagate_attributes(user_id: "user_123", session_id: "session_abc") do
  # All spans created here inherit attributes
end

With metadata and tags

Langfuse.propagate_attributes(
  user_id: "user_123",
  metadata: { environment: "production", region: "us-east" },
  tags: ["api", "v2"]
) do
  # All spans inherit these attributes
end

Parameters:

  • user_id (String, nil) (defaults to: nil)

    User identifier (≤200 characters)

  • session_id (String, nil) (defaults to: nil)

    Session identifier (≤200 characters)

  • metadata (Hash<String, String>, nil) (defaults to: nil)

    Additional metadata (all values ≤200 characters)

  • version (String, nil) (defaults to: nil)

    Version identifier (≤200 characters)

  • tags (Array<String>, nil) (defaults to: nil)

    List of tags (each ≤200 characters)

  • as_baggage (Boolean) (defaults to: false)

    If true, propagates via OpenTelemetry baggage for cross-service propagation

Yields:

  • Block within which attributes are propagated

Returns:

  • (Object)

    The result of the block

Raises:

  • (ArgumentError)


78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/langfuse/propagation.rb', line 78

def self.propagate_attributes(user_id: nil, session_id: nil, metadata: nil, version: nil, tags: nil,
                              as_baggage: false, &block)
  raise ArgumentError, "Block required" unless block

  _propagate_attributes(
    user_id: user_id,
    session_id: session_id,
    metadata: ,
    version: version,
    tags: tags,
    as_baggage: as_baggage,
    &block
  )
end