Module: Langfuse::Propagation

Defined in:
lib/langfuse/propagation.rb

Overview

Attribute propagation utilities for Langfuse OpenTelemetry integration.

This module provides the ‘propagate_attributes` method for setting trace-level attributes (user_id, session_id, metadata) that automatically propagate to all child spans within the context.

rubocop:disable Metrics/ModuleLength

Examples:

Basic usage

Langfuse.observe("operation") do |span|
  Langfuse.propagate_attributes(user_id: "user_123", session_id: "session_abc") do
    # Current span has user_id and session_id
    span.start_observation("child") do |child|
      # Child span inherits user_id and session_id
    end
  end
end

Constant Summary collapse

SPAN_KEY_MAP =

Map of propagated attribute keys to span attribute keys

{
  "user_id" => OtelAttributes::TRACE_USER_ID,
  "session_id" => OtelAttributes::TRACE_SESSION_ID,
  "version" => OtelAttributes::VERSION,
  "tags" => OtelAttributes::TRACE_TAGS,
  "metadata" => OtelAttributes::TRACE_METADATA
}.freeze
CONTEXT_KEYS =

OpenTelemetry context keys for propagated attributes

{
  "user_id" => OpenTelemetry::Context.create_key("langfuse_user_id"),
  "session_id" => OpenTelemetry::Context.create_key("langfuse_session_id"),
  "metadata" => OpenTelemetry::Context.create_key("langfuse_metadata"),
  "version" => OpenTelemetry::Context.create_key("langfuse_version"),
  "tags" => OpenTelemetry::Context.create_key("langfuse_tags")
}.freeze
PROPAGATED_ATTRIBUTES =

List of propagated attribute keys (derived from CONTEXT_KEYS)

CONTEXT_KEYS.keys.freeze
BAGGAGE_PREFIX =

Baggage key prefix for cross-service propagation

"langfuse_"

Class Method Summary collapse

Class Method Details

._extract_baggage_attributes(context) ⇒ Hash<String, String, Array<String>>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Extract propagated attributes from baggage

Parameters:

  • context (OpenTelemetry::Context)

    The context to read baggage from

Returns:

  • (Hash<String, String, Array<String>>)

    Hash of span key => value



395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
# File 'lib/langfuse/propagation.rb', line 395

def self._extract_baggage_attributes(context)
  return {} unless baggage_available?

  baggage = OpenTelemetry::Baggage.values(context: context)
  return {} unless baggage.is_a?(Hash)

  attributes = {}
  baggage.each do |baggage_key, baggage_value|
    next unless baggage_key.to_s.start_with?(BAGGAGE_PREFIX)

    span_key = _get_span_key_from_baggage_key(baggage_key.to_s)
    next unless span_key

    attributes[span_key] = _parse_baggage_value(span_key, baggage_value)
  end
  attributes
rescue StandardError => e
  Langfuse.configuration.logger.debug("Langfuse: Baggage extraction failed: #{e.message}")
  {}
end

._get_propagated_baggage_key(key) ⇒ String

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get baggage key for a propagated attribute

Parameters:

  • key (String)

    Attribute key (user_id, session_id, etc.)

Returns:

  • (String)

    Baggage key (snake_case for cross-service compatibility)



356
357
358
# File 'lib/langfuse/propagation.rb', line 356

def self._get_propagated_baggage_key(key)
  "#{BAGGAGE_PREFIX}#{key}"
end

._get_propagated_context_key(key) ⇒ OpenTelemetry::Context::Key

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get context key for a propagated attribute

Parameters:

  • key (String)

    Attribute key (user_id, session_id, etc.)

Returns:

  • (OpenTelemetry::Context::Key)

    Context key object

Raises:

  • (ArgumentError)

    if key is not a known propagated attribute



336
337
338
# File 'lib/langfuse/propagation.rb', line 336

def self._get_propagated_context_key(key)
  CONTEXT_KEYS[key] || raise(ArgumentError, "Unknown propagated attribute key: #{key}")
end

._get_propagated_span_key(key) ⇒ String

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get span attribute key for a propagated attribute

Parameters:

  • key (String)

    Attribute key (user_id, session_id, etc.)

Returns:

  • (String)

    Span attribute key



346
347
348
# File 'lib/langfuse/propagation.rb', line 346

def self._get_propagated_span_key(key)
  SPAN_KEY_MAP[key] || "#{OtelAttributes::TRACE_METADATA}.#{key}"
end

._get_span_key_from_baggage_key(baggage_key) ⇒ String?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get span key from baggage key

Parameters:

  • baggage_key (String)

    Baggage key

Returns:

  • (String, nil)

    Span key or nil if not a Langfuse baggage key



366
367
368
369
370
371
372
373
374
375
376
377
378
# File 'lib/langfuse/propagation.rb', line 366

def self._get_span_key_from_baggage_key(baggage_key)
  return nil unless baggage_key.start_with?(BAGGAGE_PREFIX)

  suffix = baggage_key[BAGGAGE_PREFIX.length..]

  # Handle metadata keys (format: langfuse_metadata_{key_name})
  if suffix.start_with?("metadata_")
     = suffix[("metadata_".length)..]
    return "#{OtelAttributes::TRACE_METADATA}.#{}"
  end

  SPAN_KEY_MAP[suffix]
end

._merge_metadata(context, context_key, new_metadata) ⇒ Hash<String, String>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Merge metadata with existing context value

Parameters:

  • context (OpenTelemetry::Context)

    Current context

  • context_key (OpenTelemetry::Context::Key)

    Context key for metadata

  • new_metadata (Hash<String, String>)

    New metadata to merge

Returns:

  • (Hash<String, String>)

    Merged metadata



195
196
197
198
199
# File 'lib/langfuse/propagation.rb', line 195

def self.(context, context_key, )
  existing = context.value(context_key) || {}
  existing = existing.to_h if existing.respond_to?(:to_h)
  existing.merge()
end

._merge_tags(context, context_key, new_tags) ⇒ Array<String>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Merge tags with existing context value

Parameters:

  • context (OpenTelemetry::Context)

    Current context

  • context_key (OpenTelemetry::Context::Key)

    Context key for tags

  • new_tags (Array<String>)

    New tags to merge

Returns:

  • (Array<String>)

    Merged tags (deduplicated)



209
210
211
212
213
# File 'lib/langfuse/propagation.rb', line 209

def self._merge_tags(context, context_key, new_tags)
  existing = context.value(context_key) || []
  existing = existing.to_a if existing.respond_to?(:to_a)
  (existing + new_tags).uniq
end

._parse_baggage_value(span_key, baggage_value) ⇒ String+

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Parse a baggage value into the appropriate format

Parameters:

  • span_key (String)

    The span attribute key

  • baggage_value (String, Object)

    The baggage value

Returns:

  • (String, Array<String>)

    Parsed value



423
424
425
426
427
428
429
# File 'lib/langfuse/propagation.rb', line 423

def self._parse_baggage_value(span_key, baggage_value)
  if span_key == OtelAttributes::TRACE_TAGS && baggage_value.is_a?(String)
    baggage_value.split(",")
  else
    baggage_value.to_s
  end
end

._propagate_attributes(user_id: nil, session_id: nil, metadata: nil, version: nil, tags: nil, as_baggage: false) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Internal implementation of propagate_attributes



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/langfuse/propagation.rb', line 97

def self._propagate_attributes(user_id: nil, session_id: nil, metadata: nil, version: nil, tags: nil,
                               as_baggage: false, &)
  current_context = OpenTelemetry::Context.current
  current_span = OpenTelemetry::Trace.current_span

  # Process each propagated attribute using PROPAGATED_ATTRIBUTES constant
  PROPAGATED_ATTRIBUTES.each do |key|
    value = binding.local_variable_get(key.to_sym)
    next if value.nil?
    next if key == "tags" && value.empty?

    validated_value = _validate_attribute_value(key, value)
    next unless validated_value

    current_context = _set_propagated_attribute(
      key: key,
      value: validated_value,
      context: current_context,
      span: current_span,
      as_baggage: as_baggage
    )
  end

  # Execute block in new context
  OpenTelemetry::Context.with_current(current_context, &)
end

._set_baggage_attribute(context:, key:, value:, baggage_key:) ⇒ OpenTelemetry::Context

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Set a propagated attribute in baggage

rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

Parameters:

  • context (OpenTelemetry::Context)

    Current context

  • key (String)

    Attribute key (user_id, session_id, version, tags, metadata)

  • value (String, Array<String>, Hash<String, String>)

    Attribute value

  • baggage_key (String)

    Baggage key prefix

Returns:

  • (OpenTelemetry::Context)

    New context with baggage set



441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
# File 'lib/langfuse/propagation.rb', line 441

def self._set_baggage_attribute(context:, key:, value:, baggage_key:)
  return context unless baggage_available?

  if key == "metadata" && value.is_a?(Hash)
    value.each do |k, v|
      entry_key = "#{baggage_key}_#{k}"
      context = OpenTelemetry::Baggage.set_value(entry_key, v.to_s, context: context)
    end
  elsif key == "tags" && value.is_a?(Array)
    context = OpenTelemetry::Baggage.set_value(baggage_key, value.join(","), context: context)
  else
    context = OpenTelemetry::Baggage.set_value(baggage_key, value.to_s, context: context)
  end
  context
rescue StandardError => e
  Langfuse.configuration.logger.warn("Langfuse: Failed to set baggage: #{e.message}")
  context
end

._set_propagated_attribute(key:, value:, context:, span:, as_baggage:) ⇒ OpenTelemetry::Context

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Set a propagated attribute in context and on current span

rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity

Parameters:

  • key (String)

    Attribute key (user_id, session_id, version, tags, metadata)

  • value (String, Array<String>, Hash<String, String>)

    Attribute value

  • context (OpenTelemetry::Context)

    Current context

  • span (OpenTelemetry::Trace::Span, nil)

    Current span (may be nil)

  • as_baggage (Boolean)

    Whether to set in baggage

Returns:

  • (OpenTelemetry::Context)

    New context with attribute set



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/langfuse/propagation.rb', line 226

def self._set_propagated_attribute(key:, value:, context:, span:, as_baggage:)
  context_key = _get_propagated_context_key(key)
  span_key = _get_propagated_span_key(key)
  baggage_key = _get_propagated_baggage_key(key)

  # Merge metadata/tags with existing context values
  value = if key == "metadata" && value.is_a?(Hash)
            (context, context_key, value)
          elsif key == "tags" && value.is_a?(Array)
            _merge_tags(context, context_key, value)
          else
            value
          end

  # Set in context
  context = context.set_value(context_key, value)

  # Set on current span (if recording)
  if span&.recording?
    if key == "metadata" && value.is_a?(Hash)
      # Handle metadata - flatten into individual attributes
      value.each do |k, v|
         = "#{OtelAttributes::TRACE_METADATA}.#{k}"
        span.set_attribute(, v.to_s)
      end
    elsif key == "tags" && value.is_a?(Array)
      # Handle tags - serialize as JSON array
      serialized_tags = OtelAttributes.serialize(value)
      span.set_attribute(span_key, serialized_tags) if serialized_tags
    else
      span.set_attribute(span_key, value.to_s)
    end
  end

  # Set in baggage (if requested and available)
  # Note: Baggage support requires opentelemetry-baggage gem
  if as_baggage
    unless baggage_available?
      Langfuse.configuration.logger.warn(
        "Langfuse: Baggage propagation requested but opentelemetry-baggage gem not available. " \
        "Install opentelemetry-baggage for cross-service propagation."
      )
    end

    context = _set_baggage_attribute(
      context: context,
      key: key,
      value: value,
      baggage_key: baggage_key
    )
  end

  context
end

._validate_attribute_value(key, value) ⇒ Object?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Validate an attribute value based on its type

rubocop:disable Metrics/CyclomaticComplexity

Parameters:

  • key (String)

    Attribute key

  • value (Object)

    Attribute value

Returns:

  • (Object, nil)

    Validated value or nil if invalid



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/langfuse/propagation.rb', line 132

def self._validate_attribute_value(key, value)
  case key
  when "tags"
    validated_tags = value.filter_map { |tag| _validate_propagated_value(tag, "tag") }
    validated_tags.any? ? validated_tags : nil
  when "metadata"
     = {}
    value.each do |k, v|
      [k.to_s] = v.to_s if _validate_string_value(v, "metadata.#{k}")
    end
    .any? ?  : nil
  else
    _validate_propagated_value(value, key)
  end
end

._validate_propagated_value(value, key) ⇒ String, ...

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Validate a propagated value (string or array of strings)

Parameters:

  • value (String, Array<String>)

    Value to validate

  • key (String)

    Attribute key for error messages

Returns:

  • (String, Array<String>, nil)

    Validated value or nil if invalid



289
290
291
292
293
294
295
296
297
298
299
# File 'lib/langfuse/propagation.rb', line 289

def self._validate_propagated_value(value, key)
  if value.is_a?(Array)
    validated = value.filter_map { |v| _validate_string_value(v, key) ? v : nil }
    return validated.any? ? validated : nil
  end

  # Validate string value (will log warning if not a string)
  return nil unless _validate_string_value(value, key)

  value
end

._validate_string_value(value, key) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Validate a string value

rubocop:disable Naming/PredicateMethod

Parameters:

  • value (String)

    Value to validate

  • key (String)

    Attribute key for error messages

Returns:

  • (Boolean)

    True if valid, false otherwise



309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
# File 'lib/langfuse/propagation.rb', line 309

def self._validate_string_value(value, key)
  unless value.is_a?(String)
    Langfuse.configuration.logger.warn(
      "Langfuse: Propagated attribute '#{key}' value is not a string. Dropping value."
    )
    return false
  end

  if value.length > 200
    Langfuse.configuration.logger.warn(
      "Langfuse: Propagated attribute '#{key}' value is over 200 characters " \
      "(#{value.length} chars). Dropping value."
    )
    return false
  end

  true
end

.baggage_available?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Check if baggage API is available

Returns:

  • (Boolean)

    True if OpenTelemetry::Baggage is defined



385
386
387
# File 'lib/langfuse/propagation.rb', line 385

def self.baggage_available?
  defined?(OpenTelemetry::Baggage)
end

.get_propagated_attributes_from_context(context) ⇒ Hash<String, String, Array<String>>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get propagated attributes from context for span processor

rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

Parameters:

  • context (OpenTelemetry::Context)

    The context to read from

Returns:

  • (Hash<String, String, Array<String>>)

    Hash of span key => value



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/langfuse/propagation.rb', line 156

def self.get_propagated_attributes_from_context(context)
  propagated_attributes = _extract_baggage_attributes(context)

  # Handle OTEL context values
  PROPAGATED_ATTRIBUTES.each do |key|
    context_key = _get_propagated_context_key(key)
    value = context.value(context_key)

    next if value.nil?

    span_key = _get_propagated_span_key(key)

    if key == "metadata" && value.is_a?(Hash)
      # Handle metadata - flatten into individual attributes
      value.each do |k, v|
         = "#{OtelAttributes::TRACE_METADATA}.#{k}"
        propagated_attributes[] = v.to_s
      end
    elsif key == "tags" && value.is_a?(Array)
      # Handle tags - serialize as JSON array for span attributes
      serialized_tags = OtelAttributes.serialize(value)
      propagated_attributes[span_key] = serialized_tags if serialized_tags
    else
      propagated_attributes[span_key] = value.to_s
    end
  end
  # rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

  propagated_attributes
end

.propagate_attributes(user_id: nil, session_id: nil, metadata: nil, version: nil, tags: nil, as_baggage: false) { ... } ⇒ Object

Propagate trace-level attributes to all spans created within this context.

This method sets attributes on the currently active span AND automatically propagates them to all new child spans created within the block. This is the recommended way to set trace-level attributes like user_id, session_id, and metadata dimensions that should be consistently applied across all observations in a trace.

Examples:

Basic usage

Langfuse.propagate_attributes(user_id: "user_123", session_id: "session_abc") do
  # All spans created here inherit attributes
end

With metadata and tags

Langfuse.propagate_attributes(
  user_id: "user_123",
  metadata: { environment: "production", region: "us-east" },
  tags: ["api", "v2"]
) do
  # All spans inherit these attributes
end

Parameters:

  • user_id (String, nil) (defaults to: nil)

    User identifier (≤200 characters)

  • session_id (String, nil) (defaults to: nil)

    Session identifier (≤200 characters)

  • metadata (Hash<String, String>, nil) (defaults to: nil)

    Additional metadata (all values ≤200 characters)

  • version (String, nil) (defaults to: nil)

    Version identifier (≤200 characters)

  • tags (Array<String>, nil) (defaults to: nil)

    List of tags (each ≤200 characters)

  • as_baggage (Boolean) (defaults to: false)

    If true, propagates via OpenTelemetry baggage for cross-service propagation

Yields:

  • Block within which attributes are propagated

Returns:

  • (Object)

    The result of the block

Raises:

  • (ArgumentError)

    if no block is given



79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/langfuse/propagation.rb', line 79

def self.propagate_attributes(user_id: nil, session_id: nil, metadata: nil, version: nil, tags: nil,
                              as_baggage: false, &block)
  raise ArgumentError, "Block required" unless block

  _propagate_attributes(
    user_id: user_id,
    session_id: session_id,
    metadata: ,
    version: version,
    tags: tags,
    as_baggage: as_baggage,
    &block
  )
end