Class: Langfuse::ScoreClient Private

Inherits:
Object
  • Object
show all
Defined in:
lib/langfuse/score_client.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Client for creating and batching Langfuse scores

Handles thread-safe queuing, batching, and sending of score events to the Langfuse ingestion API. Scores are batched and sent automatically based on batch_size and flush_interval configuration.

rubocop:disable Metrics/ClassLength

Examples:

Basic usage

score_client = ScoreClient.new(api_client: api_client, config: config)
score_client.create(name: "quality", value: 0.85, trace_id: "abc123...")

With OTel integration

Langfuse.observe("operation") do |obs|
  score_client.score_active_observation(name: "accuracy", value: 0.92)
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(api_client:, config:) ⇒ ScoreClient

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Initialize a new ScoreClient

Parameters:

  • api_client (ApiClient)

    The API client for sending batches

  • config (Config)

    Configuration object with batch_size and flush_interval



38
39
40
41
42
43
44
45
46
47
48
# File 'lib/langfuse/score_client.rb', line 38

def initialize(api_client:, config:)
  @api_client = api_client
  @config = config
  @logger = config.logger
  @queue = Queue.new
  @mutex = Mutex.new
  @flush_thread = nil
  @shutdown = false

  start_flush_timer
end

Instance Attribute Details

#api_clientApiClient (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns The API client for sending batches.

Returns:

  • (ApiClient)

    The API client for sending batches



26
27
28
# File 'lib/langfuse/score_client.rb', line 26

def api_client
  @api_client
end

#configConfig (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Configuration object.

Returns:

  • (Config)

    Configuration object



29
30
31
# File 'lib/langfuse/score_client.rb', line 29

def config
  @config
end

#loggerLogger (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Logger instance.

Returns:

  • (Logger)

    Logger instance



32
33
34
# File 'lib/langfuse/score_client.rb', line 32

def logger
  @logger
end

Instance Method Details

#create(name:, value:, trace_id: nil, observation_id: nil, comment: nil, metadata: nil, data_type: :numeric, dataset_run_id: nil, config_id: nil) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Create a score event and queue it for batching

rubocop:disable Metrics/ParameterLists

Examples:

Numeric score

create(name: "quality", value: 0.85, trace_id: "abc123", data_type: :numeric)

Boolean score

create(name: "passed", value: true, trace_id: "abc123", data_type: :boolean)

Categorical score

create(name: "category", value: "high", trace_id: "abc123", data_type: :categorical)

Parameters:

  • name (String)

    Score name (required)

  • value (Numeric, Integer, String)

    Score value (type depends on data_type)

  • trace_id (String, nil) (defaults to: nil)

    Trace ID to associate with the score

  • observation_id (String, nil) (defaults to: nil)

    Observation ID to associate with the score

  • comment (String, nil) (defaults to: nil)

    Optional comment

  • metadata (Hash, nil) (defaults to: nil)

    Optional metadata hash

  • data_type (Symbol) (defaults to: :numeric)

    Data type (:numeric, :boolean, :categorical)

  • dataset_run_id (String, nil) (defaults to: nil)

    Optional dataset run ID to associate with the score

  • config_id (String, nil) (defaults to: nil)

    Optional score config ID

Raises:

  • (ArgumentError)

    if validation fails



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/langfuse/score_client.rb', line 73

def create(name:, value:, trace_id: nil, observation_id: nil, comment: nil, metadata: nil,
           data_type: :numeric, dataset_run_id: nil, config_id: nil)
  validate_name(name)
  normalized_value = normalize_value(value, data_type)
  data_type_str = Types::SCORE_DATA_TYPES[data_type] || raise(ArgumentError, "Invalid data_type: #{data_type}")

  event = build_score_event(
    name: name,
    value: normalized_value,
    trace_id: trace_id,
    observation_id: observation_id,
    comment: comment,
    metadata: ,
    data_type: data_type_str,
    dataset_run_id: dataset_run_id,
    config_id: config_id
  )

  @queue << event

  # Trigger flush if batch size reached
  flush if @queue.size >= config.batch_size
rescue StandardError => e
  logger.error("Langfuse score creation failed: #{e.message}")
  raise
end

#flushvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Force flush all queued score events

Sends all queued events to the API immediately.



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/langfuse/score_client.rb', line 167

def flush
  return if @queue.empty?

  events = []
  @queue.size.times do
    events << @queue.pop(true)
  rescue StandardError
    nil
  end
  events.compact!

  return if events.empty?

  send_batch(events)
rescue StandardError => e
  logger.error("Langfuse score flush failed: #{e.message}")
  # Don't raise - silent error handling for batch operations
end

#score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Create a score for the currently active observation (from OTel span)

Extracts observation_id and trace_id from the active OpenTelemetry span.

Examples:

Langfuse.observe("operation") do |obs|
  score_client.score_active_observation(name: "accuracy", value: 0.92)
end

Parameters:

  • name (String)

    Score name (required)

  • value (Numeric, Integer, String)

    Score value

  • comment (String, nil) (defaults to: nil)

    Optional comment

  • metadata (Hash, nil) (defaults to: nil)

    Optional metadata hash

  • data_type (Symbol) (defaults to: :numeric)

    Data type (:numeric, :boolean, :categorical)

Raises:

  • (ArgumentError)

    if no active span or validation fails



117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/langfuse/score_client.rb', line 117

def score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric)
  ids = extract_ids_from_active_span
  raise ArgumentError, "No active OpenTelemetry span found" unless ids[:observation_id]

  create(
    name: name,
    value: value,
    trace_id: ids[:trace_id],
    observation_id: ids[:observation_id],
    comment: comment,
    metadata: ,
    data_type: data_type
  )
end

#score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Create a score for the currently active trace (from OTel span)

Extracts trace_id from the active OpenTelemetry span.

Examples:

Langfuse.observe("operation") do |obs|
  score_client.score_active_trace(name: "overall_quality", value: 5)
end

Parameters:

  • name (String)

    Score name (required)

  • value (Numeric, Integer, String)

    Score value

  • comment (String, nil) (defaults to: nil)

    Optional comment

  • metadata (Hash, nil) (defaults to: nil)

    Optional metadata hash

  • data_type (Symbol) (defaults to: :numeric)

    Data type (:numeric, :boolean, :categorical)

Raises:

  • (ArgumentError)

    if no active span or validation fails



148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/langfuse/score_client.rb', line 148

def score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric)
  ids = extract_ids_from_active_span
  raise ArgumentError, "No active OpenTelemetry span found" unless ids[:trace_id]

  create(
    name: name,
    value: value,
    trace_id: ids[:trace_id],
    comment: comment,
    metadata: ,
    data_type: data_type
  )
end

#shutdownvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Shutdown the score client and flush remaining events

Stops the flush timer thread and sends any remaining queued events.



191
192
193
194
195
196
197
198
199
# File 'lib/langfuse/score_client.rb', line 191

def shutdown
  @mutex.synchronize do
    return if @shutdown

    @shutdown = true
    stop_flush_timer
    flush
  end
end