Class: Langfuse::ScoreClient Private

Inherits:
Object
  • Object
show all
Defined in:
lib/langfuse/score_client.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Client for creating and batching Langfuse scores

Handles thread-safe queuing, batching, and sending of score events to the Langfuse ingestion API. Scores are batched and sent automatically based on batch_size and flush_interval configuration.

rubocop:disable Metrics/ClassLength

Examples:

Basic usage

score_client = ScoreClient.new(api_client: api_client, config: config)
score_client.create(name: "quality", value: 0.85, trace_id: "abc123...")

With OTel integration

Langfuse.observe("operation") do |obs|
  score_client.score_active_observation(name: "accuracy", value: 0.92)
end

Constant Summary collapse

HEX_TRACE_ID_PATTERN =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

/\A[0-9a-f]{32}\z/

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(api_client:, config:) ⇒ ScoreClient

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Initialize a new ScoreClient

Parameters:

  • api_client (ApiClient)

    The API client for sending batches

  • config (Config)

    Configuration object with batch_size and flush_interval



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/langfuse/score_client.rb', line 40

def initialize(api_client:, config:)
  @api_client = api_client
  @config = config
  @logger = config.logger
  @queue = Queue.new
  @mutex = Mutex.new
  @flush_thread = nil
  @shutdown = false
  # Match the immutable tracing setup contract: once this client exists, later config
  # mutations must not change score sampling without rebuilding the client.
  @score_sampler = Sampling.build_sampler(config.sample_rate)

  start_flush_timer
end

Instance Attribute Details

#api_clientApiClient (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns The API client for sending batches.

Returns:

  • (ApiClient)

    The API client for sending batches



26
27
28
# File 'lib/langfuse/score_client.rb', line 26

def api_client
  @api_client
end

#configConfig (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Configuration object.

Returns:

  • (Config)

    Configuration object



29
30
31
# File 'lib/langfuse/score_client.rb', line 29

def config
  @config
end

#loggerLogger (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Logger instance.

Returns:

  • (Logger)

    Logger instance



32
33
34
# File 'lib/langfuse/score_client.rb', line 32

def logger
  @logger
end

Instance Method Details

#create(name:, value:, id: nil, trace_id: nil, session_id: nil, observation_id: nil, comment: nil, metadata: nil, environment: nil, data_type: :numeric, dataset_run_id: nil, config_id: nil) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Create a score event and queue it for batching

rubocop:disable Metrics/ParameterLists

Examples:

Numeric score

create(name: "quality", value: 0.85, trace_id: "abc123", data_type: :numeric)

Boolean score

create(name: "passed", value: true, trace_id: "abc123", data_type: :boolean)

Categorical score

create(name: "category", value: "high", trace_id: "abc123", data_type: :categorical)

Parameters:

  • name (String)

    Score name (required)

  • value (Numeric, Integer, String)

    Score value (type depends on data_type)

  • id (String, nil) (defaults to: nil)

    Score ID

  • trace_id (String, nil) (defaults to: nil)

    Trace ID to associate with the score

  • session_id (String, nil) (defaults to: nil)

    Session ID to associate with the score

  • observation_id (String, nil) (defaults to: nil)

    Observation ID to associate with the score

  • comment (String, nil) (defaults to: nil)

    Optional comment

  • metadata (Hash, nil) (defaults to: nil)

    Optional metadata hash

  • environment (String, nil) (defaults to: nil)

    Optional environment

  • data_type (Symbol) (defaults to: :numeric)

    Data type (:numeric, :boolean, :categorical)

  • dataset_run_id (String, nil) (defaults to: nil)

    Optional dataset run ID to associate with the score

  • config_id (String, nil) (defaults to: nil)

    Optional score config ID

Raises:

  • (ArgumentError)

    if validation fails



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/langfuse/score_client.rb', line 81

def create(name:, value:, id: nil, trace_id: nil, session_id: nil, observation_id: nil, comment: nil,
           metadata: nil, environment: nil, data_type: :numeric, dataset_run_id: nil, config_id: nil)
  validate_name(name)
  normalized_value = normalize_value(value, data_type)
  data_type_str = Types::SCORE_DATA_TYPES[data_type] || raise(ArgumentError, "Invalid data_type: #{data_type}")

  return unless enqueue_trace_linked_score?(trace_id)

  event = build_score_event(
    name: name, value: normalized_value, id: id, trace_id: trace_id,
    session_id: session_id, observation_id: observation_id, comment: comment,
    metadata: , environment: environment, data_type: data_type_str,
    dataset_run_id: dataset_run_id, config_id: config_id
  )

  @queue << event
  flush if @queue.size >= config.batch_size
rescue StandardError => e
  logger.error("Langfuse score creation failed: #{e.message}")
  raise
end

#flushvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Force flush all queued score events

Sends all queued events to the API immediately.



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/langfuse/score_client.rb', line 170

def flush
  return if @queue.empty?

  events = []
  @queue.size.times do
    events << @queue.pop(true)
  rescue StandardError
    nil
  end
  events.compact!

  return if events.empty?

  send_batch(events)
rescue StandardError => e
  logger.error("Langfuse score flush failed: #{e.message}")
  # Don't raise - silent error handling for batch operations
end

#score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Create a score for the currently active observation (from OTel span)

Extracts observation_id and trace_id from the active OpenTelemetry span.

Examples:

Langfuse.observe("operation") do |obs|
  score_client.score_active_observation(name: "accuracy", value: 0.92)
end

Parameters:

  • name (String)

    Score name (required)

  • value (Numeric, Integer, String)

    Score value

  • comment (String, nil) (defaults to: nil)

    Optional comment

  • metadata (Hash, nil) (defaults to: nil)

    Optional metadata hash

  • data_type (Symbol) (defaults to: :numeric)

    Data type (:numeric, :boolean, :categorical)

Raises:

  • (ArgumentError)

    if no active span or validation fails



120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/langfuse/score_client.rb', line 120

def score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric)
  ids = extract_ids_from_active_span
  raise ArgumentError, "No active OpenTelemetry span found" unless ids[:observation_id]

  create(
    name: name,
    value: value,
    trace_id: ids[:trace_id],
    observation_id: ids[:observation_id],
    comment: comment,
    metadata: ,
    data_type: data_type
  )
end

#score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Create a score for the currently active trace (from OTel span)

Extracts trace_id from the active OpenTelemetry span.

Examples:

Langfuse.observe("operation") do |obs|
  score_client.score_active_trace(name: "overall_quality", value: 5)
end

Parameters:

  • name (String)

    Score name (required)

  • value (Numeric, Integer, String)

    Score value

  • comment (String, nil) (defaults to: nil)

    Optional comment

  • metadata (Hash, nil) (defaults to: nil)

    Optional metadata hash

  • data_type (Symbol) (defaults to: :numeric)

    Data type (:numeric, :boolean, :categorical)

Raises:

  • (ArgumentError)

    if no active span or validation fails



151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/langfuse/score_client.rb', line 151

def score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric)
  ids = extract_ids_from_active_span
  raise ArgumentError, "No active OpenTelemetry span found" unless ids[:trace_id]

  create(
    name: name,
    value: value,
    trace_id: ids[:trace_id],
    comment: comment,
    metadata: ,
    data_type: data_type
  )
end

#shutdownvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Shutdown the score client and flush remaining events

Stops the flush timer thread and sends any remaining queued events.



194
195
196
197
198
199
200
201
202
# File 'lib/langfuse/score_client.rb', line 194

def shutdown
  @mutex.synchronize do
    return if @shutdown

    @shutdown = true
    stop_flush_timer
    flush
  end
end