Module: Langfuse::Propagation

Defined in:
lib/langfuse/propagation.rb

Overview

Attribute propagation utilities for Langfuse OpenTelemetry integration.

This module provides the ‘propagate_attributes` method for setting trace-level attributes (user_id, session_id, metadata) that automatically propagate to all child spans within the context.

rubocop:disable Metrics/ModuleLength

Examples:

Basic usage

Langfuse.observe("operation") do |span|
  Langfuse.propagate_attributes(user_id: "user_123", session_id: "session_abc") do
    # Current span has user_id and session_id
    span.start_observation("child") do |child|
      # Child span inherits user_id and session_id
    end
  end
end

Constant Summary collapse

SPAN_KEY_MAP =

Map of propagated attribute keys to span attribute keys

{
  "user_id" => OtelAttributes::TRACE_USER_ID,
  "session_id" => OtelAttributes::TRACE_SESSION_ID,
  "version" => OtelAttributes::VERSION,
  "tags" => OtelAttributes::TRACE_TAGS,
  "metadata" => OtelAttributes::TRACE_METADATA
}.freeze
CONTEXT_KEYS =

OpenTelemetry context keys for propagated attributes

{
  "user_id" => OpenTelemetry::Context.create_key("langfuse_user_id"),
  "session_id" => OpenTelemetry::Context.create_key("langfuse_session_id"),
  "metadata" => OpenTelemetry::Context.create_key("langfuse_metadata"),
  "version" => OpenTelemetry::Context.create_key("langfuse_version"),
  "tags" => OpenTelemetry::Context.create_key("langfuse_tags")
}.freeze
PROPAGATED_ATTRIBUTES =

List of propagated attribute keys (derived from CONTEXT_KEYS)

CONTEXT_KEYS.keys.freeze
BAGGAGE_PREFIX =

Baggage key prefix for cross-service propagation

"langfuse_"

Class Method Summary collapse

Class Method Details

._extract_baggage_attributes(context) ⇒ Hash<String, String, Array<String>>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Extract propagated attributes from baggage

Parameters:

  • context (OpenTelemetry::Context)

    The context to read baggage from

Returns:

  • (Hash<String, String, Array<String>>)

    Hash of span key => value



387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
# File 'lib/langfuse/propagation.rb', line 387

def self._extract_baggage_attributes(context)
  return {} unless baggage_available?

  baggage = OpenTelemetry::Baggage.values(context: context)
  return {} unless baggage.is_a?(Hash)

  attributes = {}
  baggage.each do |baggage_key, baggage_value|
    next unless baggage_key.to_s.start_with?(BAGGAGE_PREFIX)

    span_key = _get_span_key_from_baggage_key(baggage_key.to_s)
    next unless span_key

    attributes[span_key] = _parse_baggage_value(span_key, baggage_value)
  end
  attributes
rescue StandardError => e
  Langfuse.configuration.logger.debug("Langfuse: Baggage extraction failed: #{e.message}")
  {}
end

._get_propagated_baggage_key(key) ⇒ String

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get baggage key for a propagated attribute

Parameters:

  • key (String)

    Attribute key (user_id, session_id, etc.)

Returns:

  • (String)

    Baggage key (snake_case for cross-service compatibility)



348
349
350
# File 'lib/langfuse/propagation.rb', line 348

def self._get_propagated_baggage_key(key)
  "#{BAGGAGE_PREFIX}#{key}"
end

._get_propagated_context_key(key) ⇒ OpenTelemetry::Context::Key

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get context key for a propagated attribute

Parameters:

  • key (String)

    Attribute key (user_id, session_id, etc.)

Returns:

  • (OpenTelemetry::Context::Key)

    Context key object

Raises:

  • (ArgumentError)

    if key is not a known propagated attribute



328
329
330
# File 'lib/langfuse/propagation.rb', line 328

def self._get_propagated_context_key(key)
  CONTEXT_KEYS[key] || raise(ArgumentError, "Unknown propagated attribute key: #{key}")
end

._get_propagated_span_key(key) ⇒ String

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get span attribute key for a propagated attribute

Parameters:

  • key (String)

    Attribute key (user_id, session_id, etc.)

Returns:

  • (String)

    Span attribute key



338
339
340
# File 'lib/langfuse/propagation.rb', line 338

def self._get_propagated_span_key(key)
  SPAN_KEY_MAP[key] || "#{OtelAttributes::TRACE_METADATA}.#{key}"
end

._get_span_key_from_baggage_key(baggage_key) ⇒ String?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get span key from baggage key

Parameters:

  • baggage_key (String)

    Baggage key

Returns:

  • (String, nil)

    Span key or nil if not a Langfuse baggage key



358
359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/langfuse/propagation.rb', line 358

def self._get_span_key_from_baggage_key(baggage_key)
  return nil unless baggage_key.start_with?(BAGGAGE_PREFIX)

  suffix = baggage_key[BAGGAGE_PREFIX.length..]

  # Handle metadata keys (format: langfuse_metadata_{key_name})
  if suffix.start_with?("metadata_")
     = suffix[("metadata_".length)..]
    return "#{OtelAttributes::TRACE_METADATA}.#{}"
  end

  SPAN_KEY_MAP[suffix]
end

._merge_metadata(context, context_key, new_metadata) ⇒ Hash<String, String>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Merge metadata with existing context value

Parameters:

  • context (OpenTelemetry::Context)

    Current context

  • context_key (OpenTelemetry::Context::Key)

    Context key for metadata

  • new_metadata (Hash<String, String>)

    New metadata to merge

Returns:

  • (Hash<String, String>)

    Merged metadata



192
193
194
195
196
# File 'lib/langfuse/propagation.rb', line 192

def self.(context, context_key, )
  existing = context.value(context_key) || {}
  existing = existing.to_h if existing.respond_to?(:to_h)
  existing.merge()
end

._merge_tags(context, context_key, new_tags) ⇒ Array<String>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Merge tags with existing context value

Parameters:

  • context (OpenTelemetry::Context)

    Current context

  • context_key (OpenTelemetry::Context::Key)

    Context key for tags

  • new_tags (Array<String>)

    New tags to merge

Returns:

  • (Array<String>)

    Merged tags (deduplicated)



206
207
208
209
210
# File 'lib/langfuse/propagation.rb', line 206

def self._merge_tags(context, context_key, new_tags)
  existing = context.value(context_key) || []
  existing = existing.to_a if existing.respond_to?(:to_a)
  (existing + new_tags).uniq.freeze
end

._parse_baggage_value(span_key, baggage_value) ⇒ String+

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Parse a baggage value into the appropriate format

Parameters:

  • span_key (String)

    The span attribute key

  • baggage_value (String, Object)

    The baggage value

Returns:

  • (String, Array<String>)

    Parsed value



415
416
417
418
419
420
421
# File 'lib/langfuse/propagation.rb', line 415

def self._parse_baggage_value(span_key, baggage_value)
  if span_key == OtelAttributes::TRACE_TAGS && baggage_value.is_a?(String)
    baggage_value.split(",")
  else
    baggage_value.to_s
  end
end

._propagate_attributes(user_id: nil, session_id: nil, metadata: nil, version: nil, tags: nil, as_baggage: false) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Internal implementation of propagate_attributes



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/langfuse/propagation.rb', line 97

def self._propagate_attributes(user_id: nil, session_id: nil, metadata: nil, version: nil, tags: nil,
                               as_baggage: false, &)
  current_context = OpenTelemetry::Context.current
  current_span = OpenTelemetry::Trace.current_span

  # Process each propagated attribute using PROPAGATED_ATTRIBUTES constant
  PROPAGATED_ATTRIBUTES.each do |key|
    value = binding.local_variable_get(key.to_sym)
    next if value.nil?
    next if key == "tags" && value.empty?

    validated_value = _validate_attribute_value(key, value)
    next unless validated_value

    current_context = _set_propagated_attribute(
      key: key,
      value: validated_value,
      context: current_context,
      span: current_span,
      as_baggage: as_baggage
    )
  end

  # Execute block in new context
  OpenTelemetry::Context.with_current(current_context, &)
end

._set_baggage_attribute(context:, key:, value:, baggage_key:) ⇒ OpenTelemetry::Context

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Set a propagated attribute in baggage

rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

Parameters:

  • context (OpenTelemetry::Context)

    Current context

  • key (String)

    Attribute key (user_id, session_id, version, tags, metadata)

  • value (String, Array<String>, Hash<String, String>)

    Attribute value

  • baggage_key (String)

    Baggage key prefix

Returns:

  • (OpenTelemetry::Context)

    New context with baggage set



433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
# File 'lib/langfuse/propagation.rb', line 433

def self._set_baggage_attribute(context:, key:, value:, baggage_key:)
  return context unless baggage_available?

  if key == "metadata" && value.is_a?(Hash)
    value.each do |k, v|
      entry_key = "#{baggage_key}_#{k}"
      context = OpenTelemetry::Baggage.set_value(entry_key, v.to_s, context: context)
    end
  elsif key == "tags" && value.is_a?(Array)
    context = OpenTelemetry::Baggage.set_value(baggage_key, value.join(","), context: context)
  else
    context = OpenTelemetry::Baggage.set_value(baggage_key, value.to_s, context: context)
  end
  context
rescue StandardError => e
  Langfuse.configuration.logger.warn("Langfuse: Failed to set baggage: #{e.message}")
  context
end

._set_propagated_attribute(key:, value:, context:, span:, as_baggage:) ⇒ OpenTelemetry::Context

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Set a propagated attribute in context and on current span

rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity

Parameters:

  • key (String)

    Attribute key (user_id, session_id, version, tags, metadata)

  • value (String, Array<String>, Hash<String, String>)

    Attribute value

  • context (OpenTelemetry::Context)

    Current context

  • span (OpenTelemetry::Trace::Span, nil)

    Current span (may be nil)

  • as_baggage (Boolean)

    Whether to set in baggage

Returns:

  • (OpenTelemetry::Context)

    New context with attribute set



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/langfuse/propagation.rb', line 223

def self._set_propagated_attribute(key:, value:, context:, span:, as_baggage:)
  context_key = _get_propagated_context_key(key)
  span_key = _get_propagated_span_key(key)
  baggage_key = _get_propagated_baggage_key(key)

  # Merge metadata/tags with existing context values
  merged = if key == "metadata" && value.is_a?(Hash)
             (context, context_key, value)
           elsif key == "tags" && value.is_a?(Array)
             _merge_tags(context, context_key, value)
           else
             value
           end

  context = context.set_value(context_key, merged)

  # Set on current span (if recording)
  if span&.recording?
    if key == "metadata" && merged.is_a?(Hash)
      merged.each do |k, v|
         = "#{OtelAttributes::TRACE_METADATA}.#{k}"
        span.set_attribute(, v.to_s)
      end
    elsif key == "tags" && merged.is_a?(Array)
      span.set_attribute(span_key, merged) unless merged.empty?
    else
      span.set_attribute(span_key, merged.to_s)
    end
  end

  # Set in baggage (if requested and available)
  if as_baggage
    unless baggage_available?
      Langfuse.configuration.logger.warn(
        "Langfuse: Baggage propagation requested but opentelemetry-baggage gem not available. " \
        "Install opentelemetry-baggage for cross-service propagation."
      )
    end

    context = _set_baggage_attribute(
      context: context,
      key: key,
      value: merged,
      baggage_key: baggage_key
    )
  end

  context
end

._validate_attribute_value(key, value) ⇒ Object?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Validate an attribute value based on its type

rubocop:disable Metrics/CyclomaticComplexity

Parameters:

  • key (String)

    Attribute key

  • value (Object)

    Attribute value

Returns:

  • (Object, nil)

    Validated value or nil if invalid



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/langfuse/propagation.rb', line 132

def self._validate_attribute_value(key, value)
  case key
  when "tags"
    validated_tags = value.filter_map { |tag| _validate_propagated_value(tag, "tag") }
    validated_tags.any? ? validated_tags : nil
  when "metadata"
     = {}
    value.each do |k, v|
      [k.to_s] = v.to_s if _validate_string_value(v, "metadata.#{k}")
    end
    .any? ?  : nil
  else
    _validate_propagated_value(value, key)
  end
end

._validate_propagated_value(value, key) ⇒ String, ...

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Validate a propagated value (string or array of strings)

Parameters:

  • value (String, Array<String>)

    Value to validate

  • key (String)

    Attribute key for error messages

Returns:

  • (String, Array<String>, nil)

    Validated value or nil if invalid



281
282
283
284
285
286
287
288
289
290
291
# File 'lib/langfuse/propagation.rb', line 281

def self._validate_propagated_value(value, key)
  if value.is_a?(Array)
    validated = value.filter_map { |v| _validate_string_value(v, key) ? v : nil }
    return validated.any? ? validated : nil
  end

  # Validate string value (will log warning if not a string)
  return nil unless _validate_string_value(value, key)

  value
end

._validate_string_value(value, key) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Validate a string value

rubocop:disable Naming/PredicateMethod

Parameters:

  • value (String)

    Value to validate

  • key (String)

    Attribute key for error messages

Returns:

  • (Boolean)

    True if valid, false otherwise



301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/langfuse/propagation.rb', line 301

def self._validate_string_value(value, key)
  unless value.is_a?(String)
    Langfuse.configuration.logger.warn(
      "Langfuse: Propagated attribute '#{key}' value is not a string. Dropping value."
    )
    return false
  end

  if value.length > 200
    Langfuse.configuration.logger.warn(
      "Langfuse: Propagated attribute '#{key}' value is over 200 characters " \
      "(#{value.length} chars). Dropping value."
    )
    return false
  end

  true
end

.baggage_available?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Check if baggage API is available

Returns:

  • (Boolean)

    True if OpenTelemetry::Baggage is defined



377
378
379
# File 'lib/langfuse/propagation.rb', line 377

def self.baggage_available?
  defined?(OpenTelemetry::Baggage)
end

.get_propagated_attributes_from_context(context) ⇒ Hash<String, String, Array<String>>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Get propagated attributes from context for span processor

rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

Parameters:

  • context (OpenTelemetry::Context)

    The context to read from

Returns:

  • (Hash<String, String, Array<String>>)

    Hash of span key => value



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/langfuse/propagation.rb', line 156

def self.get_propagated_attributes_from_context(context)
  propagated_attributes = _extract_baggage_attributes(context)

  # Handle OTEL context values
  PROPAGATED_ATTRIBUTES.each do |key|
    context_key = _get_propagated_context_key(key)
    value = context.value(context_key)

    next if value.nil?

    span_key = _get_propagated_span_key(key)

    if key == "metadata" && value.is_a?(Hash)
      value.each do |k, v|
         = "#{OtelAttributes::TRACE_METADATA}.#{k}"
        propagated_attributes[] = v.to_s
      end
    elsif key == "tags" && value.is_a?(Array)
      propagated_attributes[span_key] = value unless value.empty?
    else
      propagated_attributes[span_key] = value.to_s
    end
  end
  # rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

  propagated_attributes
end

.propagate_attributes(user_id: nil, session_id: nil, metadata: nil, version: nil, tags: nil, as_baggage: false) { ... } ⇒ Object

Propagate trace-level attributes to all spans created within this context.

This method sets attributes on the currently active span AND automatically propagates them to all new child spans created within the block. This is the recommended way to set trace-level attributes like user_id, session_id, and metadata dimensions that should be consistently applied across all observations in a trace.

Examples:

Basic usage

Langfuse.propagate_attributes(user_id: "user_123", session_id: "session_abc") do
  # All spans created here inherit attributes
end

With metadata and tags

Langfuse.propagate_attributes(
  user_id: "user_123",
  metadata: { environment: "production", region: "us-east" },
  tags: ["api", "v2"]
) do
  # All spans inherit these attributes
end

Parameters:

  • user_id (String, nil) (defaults to: nil)

    User identifier (≤200 characters)

  • session_id (String, nil) (defaults to: nil)

    Session identifier (≤200 characters)

  • metadata (Hash<String, String>, nil) (defaults to: nil)

    Additional metadata (all values ≤200 characters)

  • version (String, nil) (defaults to: nil)

    Version identifier (≤200 characters)

  • tags (Array<String>, nil) (defaults to: nil)

    List of tags (each ≤200 characters)

  • as_baggage (Boolean) (defaults to: false)

    If true, propagates via OpenTelemetry baggage for cross-service propagation

Yields:

  • Block within which attributes are propagated

Returns:

  • (Object)

    The result of the block

Raises:

  • (ArgumentError)

    if no block is given



79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/langfuse/propagation.rb', line 79

def self.propagate_attributes(user_id: nil, session_id: nil, metadata: nil, version: nil, tags: nil,
                              as_baggage: false, &block)
  raise ArgumentError, "Block required" unless block

  _propagate_attributes(
    user_id: user_id,
    session_id: session_id,
    metadata: ,
    version: version,
    tags: tags,
    as_baggage: as_baggage,
    &block
  )
end