Module: Langfuse
- Defined in:
- lib/langfuse.rb,
lib/langfuse.rb,
lib/langfuse/types.rb,
lib/langfuse/client.rb,
lib/langfuse/config.rb,
lib/langfuse/version.rb,
lib/langfuse/api_client.rb,
lib/langfuse/evaluation.rb,
lib/langfuse/otel_setup.rb,
lib/langfuse/item_result.rb,
lib/langfuse/propagation.rb,
lib/langfuse/cache_warmer.rb,
lib/langfuse/observations.rb,
lib/langfuse/prompt_cache.rb,
lib/langfuse/score_client.rb,
lib/langfuse/dataset_client.rb,
lib/langfuse/span_processor.rb,
lib/langfuse/experiment_item.rb,
lib/langfuse/otel_attributes.rb,
lib/langfuse/timestamp_parser.rb,
lib/langfuse/traced_execution.rb,
lib/langfuse/experiment_result.rb,
lib/langfuse/experiment_runner.rb,
lib/langfuse/chat_prompt_client.rb,
lib/langfuse/text_prompt_client.rb,
lib/langfuse/dataset_item_client.rb,
lib/langfuse/rails_cache_adapter.rb,
lib/langfuse/stale_while_revalidate.rb
Overview
rubocop:disable Metrics/ModuleLength
Defined Under Namespace
Modules: OtelAttributes, OtelSetup, Propagation, StaleWhileRevalidate, TimestampParser, TracedExecution, Types Classes: Agent, ApiClient, ApiError, BaseObservation, CacheWarmer, CacheWarmingError, Chain, ChatPromptClient, Client, Config, ConfigurationError, DatasetClient, DatasetItemClient, Embedding, Error, Evaluation, Evaluator, Event, ExperimentItem, ExperimentResult, ExperimentRunner, Generation, Guardrail, ItemResult, NotFoundError, PromptCache, RailsCacheAdapter, Retriever, ScoreClient, Span, SpanProcessor, TextPromptClient, Tool, UnauthorizedError
Constant Summary collapse
- FLUSH_TIMEOUT =
Default timeout (in seconds) for flushing traces during experiment runs.
5- OBSERVATION_TYPE_REGISTRY =
Registry mapping observation type strings to their wrapper classes
{ OBSERVATION_TYPES[:generation] => Generation, OBSERVATION_TYPES[:embedding] => Embedding, OBSERVATION_TYPES[:event] => Event, OBSERVATION_TYPES[:agent] => Agent, OBSERVATION_TYPES[:tool] => Tool, OBSERVATION_TYPES[:chain] => Chain, OBSERVATION_TYPES[:retriever] => Retriever, OBSERVATION_TYPES[:evaluator] => Evaluator, OBSERVATION_TYPES[:guardrail] => Guardrail, OBSERVATION_TYPES[:span] => Span }.freeze
- VERSION =
"0.4.0"- OBSERVATION_TYPES =
Observation type constants
{ span: "span", generation: "generation", embedding: "embedding", event: "event", agent: "agent", tool: "tool", chain: "chain", retriever: "retriever", evaluator: "evaluator", guardrail: "guardrail" }.freeze
Class Attribute Summary collapse
-
.configuration ⇒ Config
Returns the global configuration object.
Class Method Summary collapse
-
.client ⇒ Client
Returns the global singleton client.
-
.configure {|Config| ... } ⇒ Config
Configure Langfuse globally.
-
.create_score(name:, value:, trace_id: nil, observation_id: nil, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
Create a score event and queue it for batching.
-
.flush_scores ⇒ void
Force flush all queued score events.
-
.force_flush(timeout: 30) ⇒ void
Force flush all pending traces.
-
.observe(name, attrs = {}, as_type: :span, **kwargs) {|observation| ... } ⇒ BaseObservation, Object
User-facing convenience method for creating root observations.
-
.propagate_attributes(user_id: nil, session_id: nil, metadata: nil, version: nil, tags: nil, as_baggage: false) { ... } ⇒ Object
Propagate trace-level attributes to all spans created within this context.
-
.reset! ⇒ void
Reset global configuration and client (useful for testing).
-
.score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
Create a score for the currently active observation (from OTel span).
-
.score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
Create a score for the currently active trace (from OTel span).
-
.shutdown(timeout: 30) ⇒ void
Shutdown Langfuse and flush any pending traces and scores.
-
.start_observation(name, attrs = {}, as_type: :span, parent_span_context: nil, start_time: nil, skip_validation: false) ⇒ BaseObservation
Creates a new observation (root or child).
Class Attribute Details
Class Method Details
.client ⇒ Client
Returns the global singleton client
102 103 104 |
# File 'lib/langfuse.rb', line 102 def client @client ||= Client.new(configuration) end |
.configure {|Config| ... } ⇒ Config
Configure Langfuse globally
90 91 92 93 94 95 96 97 |
# File 'lib/langfuse.rb', line 90 def configure yield(configuration) # Auto-initialize OpenTelemetry OtelSetup.setup(configuration) configuration end |
.create_score(name:, value:, trace_id: nil, observation_id: nil, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
This method returns an undefined value.
Create a score event and queue it for batching
rubocop:disable Metrics/ParameterLists
210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/langfuse.rb', line 210 def create_score(name:, value:, trace_id: nil, observation_id: nil, comment: nil, metadata: nil, data_type: :numeric) client.create_score( name: name, value: value, trace_id: trace_id, observation_id: observation_id, comment: comment, metadata: , data_type: data_type ) end |
.flush_scores ⇒ void
This method returns an undefined value.
Force flush all queued score events
Sends all queued score events to the API immediately.
284 285 286 |
# File 'lib/langfuse.rb', line 284 def flush_scores client.flush_scores if @client end |
.force_flush(timeout: 30) ⇒ void
This method returns an undefined value.
Force flush all pending traces
126 127 128 |
# File 'lib/langfuse.rb', line 126 def force_flush(timeout: 30) OtelSetup.force_flush(timeout: timeout) end |
.observe(name, attrs = {}, as_type: :span, **kwargs) {|observation| ... } ⇒ BaseObservation, Object
User-facing convenience method for creating root observations
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 |
# File 'lib/langfuse.rb', line 377 def observe(name, attrs = {}, as_type: :span, **kwargs, &block) # Merge positional attrs and keyword kwargs merged_attrs = attrs.to_h.merge(kwargs) observation = start_observation(name, merged_attrs, as_type: as_type) if block # Block-based API: auto-ends when block completes # Set context and execute block current_context = OpenTelemetry::Context.current result = OpenTelemetry::Context.with_current( OpenTelemetry::Trace.context_with_span(observation.otel_span, parent_context: current_context) ) do block.call(observation) end # Only end if not already ended (events auto-end in start_observation) observation.end unless as_type.to_s == OBSERVATION_TYPES[:event] result else # Stateful API - return observation # Events already auto-ended in start_observation observation end end |
.propagate_attributes(user_id: nil, session_id: nil, metadata: nil, version: nil, tags: nil, as_baggage: false) { ... } ⇒ Object
Propagate trace-level attributes to all spans created within this context.
This method sets attributes on the currently active span AND automatically propagates them to all new child spans created within the block. This is the recommended way to set trace-level attributes like user_id, session_id, and metadata dimensions that should be consistently applied across all observations in a trace.
IMPORTANT: Call this as early as possible within your trace/workflow. Only the currently active span and spans created after entering this context will have these attributes. Pre-existing spans will NOT be retroactively updated.
176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/langfuse.rb', line 176 def propagate_attributes(user_id: nil, session_id: nil, metadata: nil, version: nil, tags: nil, as_baggage: false, &) Propagation.propagate_attributes( user_id: user_id, session_id: session_id, metadata: , version: version, tags: , as_baggage: as_baggage, & ) end |
.reset! ⇒ void
This method returns an undefined value.
Reset global configuration and client (useful for testing)
291 292 293 294 295 296 297 298 299 300 |
# File 'lib/langfuse.rb', line 291 def reset! client.shutdown if @client OtelSetup.shutdown(timeout: 5) if OtelSetup.initialized? @configuration = nil @client = nil rescue StandardError # Ignore shutdown errors during reset (e.g., in tests) @configuration = nil @client = nil end |
.score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
This method returns an undefined value.
Create a score for the currently active observation (from OTel span)
Extracts observation_id and trace_id from the active OpenTelemetry span.
240 241 242 243 244 245 246 247 248 |
# File 'lib/langfuse.rb', line 240 def score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) client.score_active_observation( name: name, value: value, comment: comment, metadata: , data_type: data_type ) end |
.score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
This method returns an undefined value.
Create a score for the currently active trace (from OTel span)
Extracts trace_id from the active OpenTelemetry span.
266 267 268 269 270 271 272 273 274 |
# File 'lib/langfuse.rb', line 266 def score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) client.score_active_trace( name: name, value: value, comment: comment, metadata: , data_type: data_type ) end |
.shutdown(timeout: 30) ⇒ void
This method returns an undefined value.
Shutdown Langfuse and flush any pending traces and scores
Call this when shutting down your application to ensure all traces and scores are sent to Langfuse.
117 118 119 120 |
# File 'lib/langfuse.rb', line 117 def shutdown(timeout: 30) client.shutdown if @client OtelSetup.shutdown(timeout: timeout) end |
.start_observation(name, attrs = {}, as_type: :span, parent_span_context: nil, start_time: nil, skip_validation: false) ⇒ BaseObservation
Creates a new observation (root or child)
This is the module-level factory method that creates observations of any type. It can create root observations (when parent_span_context is nil) or child observations (when parent_span_context is provided).
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 |
# File 'lib/langfuse.rb', line 324 def start_observation(name, attrs = {}, as_type: :span, parent_span_context: nil, start_time: nil, skip_validation: false) type_str = as_type.to_s unless skip_validation || valid_observation_type?(as_type) valid_types = OBSERVATION_TYPES.values.sort.join(", ") raise ArgumentError, "Invalid observation type: #{type_str}. Valid types: #{valid_types}" end otel_tracer = otel_tracer() otel_span = create_otel_span( name: name, start_time: start_time, parent_span_context: parent_span_context, otel_tracer: otel_tracer ) # Serialize attributes # Only set attributes if span is still recording (should always be true here, but guard for safety) if otel_span.recording? otel_attrs = OtelAttributes.create_observation_attributes(type_str, attrs.to_h) otel_attrs.each { |key, value| otel_span.set_attribute(key, value) } end # Wrap in appropriate class observation = wrap_otel_span(otel_span, type_str, otel_tracer, attributes: attrs) # Events auto-end immediately when created observation.end if type_str == OBSERVATION_TYPES[:event] observation end |