Class: Langfuse::ScoreClient Private

Inherits:
Object
  • Object
show all
Defined in:
lib/langfuse/score_client.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Client for creating and batching Langfuse scores

Handles thread-safe queuing, batching, and sending of score events to the Langfuse ingestion API. Scores are batched and sent automatically based on batch_size and flush_interval configuration.

rubocop:disable Metrics/ClassLength

Examples:

Basic usage

score_client = ScoreClient.new(api_client: api_client, config: config)
score_client.create(name: "quality", value: 0.85, trace_id: "abc123...")

With OTel integration

Langfuse.observe("operation") do |obs|
  score_client.score_active_observation(name: "accuracy", value: 0.92)
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(api_client:, config:) ⇒ ScoreClient

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Initialize a new ScoreClient

Parameters:

  • api_client (ApiClient)

    The API client for sending batches

  • config (Config)

    Configuration object with batch_size and flush_interval



38
39
40
41
42
43
44
45
46
47
48
# File 'lib/langfuse/score_client.rb', line 38

def initialize(api_client:, config:)
  @api_client = api_client
  @config = config
  @logger = config.logger
  @queue = Queue.new
  @mutex = Mutex.new
  @flush_thread = nil
  @shutdown = false

  start_flush_timer
end

Instance Attribute Details

#api_clientApiClient (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns The API client for sending batches.

Returns:

  • (ApiClient)

    The API client for sending batches



26
27
28
# File 'lib/langfuse/score_client.rb', line 26

def api_client
  @api_client
end

#configConfig (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Configuration object.

Returns:

  • (Config)

    Configuration object



29
30
31
# File 'lib/langfuse/score_client.rb', line 29

def config
  @config
end

#loggerLogger (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Logger instance.

Returns:

  • (Logger)

    Logger instance



32
33
34
# File 'lib/langfuse/score_client.rb', line 32

def logger
  @logger
end

Instance Method Details

#create(name:, value:, id: nil, trace_id: nil, session_id: nil, observation_id: nil, comment: nil, metadata: nil, environment: nil, data_type: :numeric, dataset_run_id: nil, config_id: nil) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Create a score event and queue it for batching

rubocop:disable Metrics/ParameterLists

Examples:

Numeric score

create(name: "quality", value: 0.85, trace_id: "abc123", data_type: :numeric)

Boolean score

create(name: "passed", value: true, trace_id: "abc123", data_type: :boolean)

Categorical score

create(name: "category", value: "high", trace_id: "abc123", data_type: :categorical)

Parameters:

  • name (String)

    Score name (required)

  • value (Numeric, Integer, String)

    Score value (type depends on data_type)

  • id (String, nil) (defaults to: nil)

    Score ID

  • trace_id (String, nil) (defaults to: nil)

    Trace ID to associate with the score

  • session_id (String, nil) (defaults to: nil)

    Session ID to associate with the score

  • observation_id (String, nil) (defaults to: nil)

    Observation ID to associate with the score

  • comment (String, nil) (defaults to: nil)

    Optional comment

  • metadata (Hash, nil) (defaults to: nil)

    Optional metadata hash

  • environment (String, nil) (defaults to: nil)

    Optional environment

  • data_type (Symbol) (defaults to: :numeric)

    Data type (:numeric, :boolean, :categorical)

  • dataset_run_id (String, nil) (defaults to: nil)

    Optional dataset run ID to associate with the score

  • config_id (String, nil) (defaults to: nil)

    Optional score config ID

Raises:

  • (ArgumentError)

    if validation fails



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/langfuse/score_client.rb', line 76

def create(name:, value:, id: nil, trace_id: nil, session_id: nil, observation_id: nil, comment: nil,
           metadata: nil, environment: nil, data_type: :numeric, dataset_run_id: nil, config_id: nil)
  validate_name(name)
  # Keep identifier policy server-side to preserve cross-SDK parity and avoid blocking valid future payloads.
  normalized_value = normalize_value(value, data_type)
  data_type_str = Types::SCORE_DATA_TYPES[data_type] || raise(ArgumentError, "Invalid data_type: #{data_type}")

  event = build_score_event(
    name: name,
    value: normalized_value,
    id: id,
    trace_id: trace_id,
    session_id: session_id,
    observation_id: observation_id,
    comment: comment,
    metadata: ,
    environment: environment,
    data_type: data_type_str,
    dataset_run_id: dataset_run_id,
    config_id: config_id
  )

  @queue << event

  # Trigger flush if batch size reached
  flush if @queue.size >= config.batch_size
rescue StandardError => e
  logger.error("Langfuse score creation failed: #{e.message}")
  raise
end

#flushvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Force flush all queued score events

Sends all queued events to the API immediately.



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/langfuse/score_client.rb', line 174

def flush
  return if @queue.empty?

  events = []
  @queue.size.times do
    events << @queue.pop(true)
  rescue StandardError
    nil
  end
  events.compact!

  return if events.empty?

  send_batch(events)
rescue StandardError => e
  logger.error("Langfuse score flush failed: #{e.message}")
  # Don't raise - silent error handling for batch operations
end

#score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Create a score for the currently active observation (from OTel span)

Extracts observation_id and trace_id from the active OpenTelemetry span.

Examples:

Langfuse.observe("operation") do |obs|
  score_client.score_active_observation(name: "accuracy", value: 0.92)
end

Parameters:

  • name (String)

    Score name (required)

  • value (Numeric, Integer, String)

    Score value

  • comment (String, nil) (defaults to: nil)

    Optional comment

  • metadata (Hash, nil) (defaults to: nil)

    Optional metadata hash

  • data_type (Symbol) (defaults to: :numeric)

    Data type (:numeric, :boolean, :categorical)

Raises:

  • (ArgumentError)

    if no active span or validation fails



124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/langfuse/score_client.rb', line 124

def score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric)
  ids = extract_ids_from_active_span
  raise ArgumentError, "No active OpenTelemetry span found" unless ids[:observation_id]

  create(
    name: name,
    value: value,
    trace_id: ids[:trace_id],
    observation_id: ids[:observation_id],
    comment: comment,
    metadata: ,
    data_type: data_type
  )
end

#score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Create a score for the currently active trace (from OTel span)

Extracts trace_id from the active OpenTelemetry span.

Examples:

Langfuse.observe("operation") do |obs|
  score_client.score_active_trace(name: "overall_quality", value: 5)
end

Parameters:

  • name (String)

    Score name (required)

  • value (Numeric, Integer, String)

    Score value

  • comment (String, nil) (defaults to: nil)

    Optional comment

  • metadata (Hash, nil) (defaults to: nil)

    Optional metadata hash

  • data_type (Symbol) (defaults to: :numeric)

    Data type (:numeric, :boolean, :categorical)

Raises:

  • (ArgumentError)

    if no active span or validation fails



155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/langfuse/score_client.rb', line 155

def score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric)
  ids = extract_ids_from_active_span
  raise ArgumentError, "No active OpenTelemetry span found" unless ids[:trace_id]

  create(
    name: name,
    value: value,
    trace_id: ids[:trace_id],
    comment: comment,
    metadata: ,
    data_type: data_type
  )
end

#shutdownvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Shutdown the score client and flush remaining events

Stops the flush timer thread and sends any remaining queued events.



198
199
200
201
202
203
204
205
206
# File 'lib/langfuse/score_client.rb', line 198

def shutdown
  @mutex.synchronize do
    return if @shutdown

    @shutdown = true
    stop_flush_timer
    flush
  end
end