Class: Langfuse::ScoreClient Private

Inherits:
Object
  • Object
show all
Defined in:
lib/langfuse/score_client.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Client for creating and batching Langfuse scores

Handles thread-safe queuing, batching, and sending of score events to the Langfuse ingestion API. Scores are batched and sent automatically based on batch_size and flush_interval configuration.

rubocop:disable Metrics/ClassLength

Examples:

Basic usage

score_client = ScoreClient.new(api_client: api_client, config: config)
score_client.create(name: "quality", value: 0.85, trace_id: "abc123...")

With OTel integration

Langfuse.observe("operation") do |obs|
  score_client.score_active_observation(name: "accuracy", value: 0.92)
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(api_client:, config:) ⇒ ScoreClient

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Initialize a new ScoreClient

Parameters:

  • api_client (ApiClient)

    The API client for sending batches

  • config (Config)

    Configuration object with batch_size and flush_interval



31
32
33
34
35
36
37
38
39
40
41
# File 'lib/langfuse/score_client.rb', line 31

def initialize(api_client:, config:)
  @api_client = api_client
  @config = config
  @logger = config.logger
  @queue = Queue.new
  @mutex = Mutex.new
  @flush_thread = nil
  @shutdown = false

  start_flush_timer
end

Instance Attribute Details

#api_clientObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



25
26
27
# File 'lib/langfuse/score_client.rb', line 25

def api_client
  @api_client
end

#configObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



25
26
27
# File 'lib/langfuse/score_client.rb', line 25

def config
  @config
end

#loggerObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



25
26
27
# File 'lib/langfuse/score_client.rb', line 25

def logger
  @logger
end

Instance Method Details

#create(name:, value:, trace_id: nil, observation_id: nil, comment: nil, metadata: nil, data_type: :numeric) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Create a score event and queue it for batching

rubocop:disable Metrics/ParameterLists

Examples:

Numeric score

create(name: "quality", value: 0.85, trace_id: "abc123", data_type: :numeric)

Boolean score

create(name: "passed", value: true, trace_id: "abc123", data_type: :boolean)

Categorical score

create(name: "category", value: "high", trace_id: "abc123", data_type: :categorical)

Parameters:

  • name (String)

    Score name (required)

  • value (Numeric, Integer, String)

    Score value (type depends on data_type)

  • trace_id (String, nil) (defaults to: nil)

    Trace ID to associate with the score

  • observation_id (String, nil) (defaults to: nil)

    Observation ID to associate with the score

  • comment (String, nil) (defaults to: nil)

    Optional comment

  • metadata (Hash, nil) (defaults to: nil)

    Optional metadata hash

  • data_type (Symbol) (defaults to: :numeric)

    Data type (:numeric, :boolean, :categorical)

Raises:

  • (ArgumentError)

    if validation fails



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/langfuse/score_client.rb', line 64

def create(name:, value:, trace_id: nil, observation_id: nil, comment: nil, metadata: nil,
           data_type: :numeric)
  validate_name(name)
  normalized_value = normalize_value(value, data_type)
  data_type_str = Types::SCORE_DATA_TYPES[data_type] || raise(ArgumentError, "Invalid data_type: #{data_type}")

  event = build_score_event(
    name: name,
    value: normalized_value,
    trace_id: trace_id,
    observation_id: observation_id,
    comment: comment,
    metadata: ,
    data_type: data_type_str
  )

  @queue << event

  # Trigger flush if batch size reached
  flush if @queue.size >= config.batch_size
rescue StandardError => e
  logger.error("Langfuse score creation failed: #{e.message}")
  raise
end

#flushvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Force flush all queued score events

Sends all queued events to the API immediately.



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/langfuse/score_client.rb', line 156

def flush
  return if @queue.empty?

  events = []
  @queue.size.times do
    events << @queue.pop(true)
  rescue StandardError
    nil
  end
  events.compact!

  return if events.empty?

  send_batch(events)
rescue StandardError => e
  logger.error("Langfuse score flush failed: #{e.message}")
  # Don't raise - silent error handling for batch operations
end

#score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Create a score for the currently active observation (from OTel span)

Extracts observation_id and trace_id from the active OpenTelemetry span.

Examples:

Langfuse.observe("operation") do |obs|
  score_client.score_active_observation(name: "accuracy", value: 0.92)
end

Parameters:

  • name (String)

    Score name (required)

  • value (Numeric, Integer, String)

    Score value

  • comment (String, nil) (defaults to: nil)

    Optional comment

  • metadata (Hash, nil) (defaults to: nil)

    Optional metadata hash

  • data_type (Symbol) (defaults to: :numeric)

    Data type (:numeric, :boolean, :categorical)

Raises:

  • (ArgumentError)

    if no active span or validation fails



106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/langfuse/score_client.rb', line 106

def score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric)
  ids = extract_ids_from_active_span
  raise ArgumentError, "No active OpenTelemetry span found" unless ids[:observation_id]

  create(
    name: name,
    value: value,
    trace_id: ids[:trace_id],
    observation_id: ids[:observation_id],
    comment: comment,
    metadata: ,
    data_type: data_type
  )
end

#score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Create a score for the currently active trace (from OTel span)

Extracts trace_id from the active OpenTelemetry span.

Examples:

Langfuse.observe("operation") do |obs|
  score_client.score_active_trace(name: "overall_quality", value: 5)
end

Parameters:

  • name (String)

    Score name (required)

  • value (Numeric, Integer, String)

    Score value

  • comment (String, nil) (defaults to: nil)

    Optional comment

  • metadata (Hash, nil) (defaults to: nil)

    Optional metadata hash

  • data_type (Symbol) (defaults to: :numeric)

    Data type (:numeric, :boolean, :categorical)

Raises:

  • (ArgumentError)

    if no active span or validation fails



137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/langfuse/score_client.rb', line 137

def score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric)
  ids = extract_ids_from_active_span
  raise ArgumentError, "No active OpenTelemetry span found" unless ids[:trace_id]

  create(
    name: name,
    value: value,
    trace_id: ids[:trace_id],
    comment: comment,
    metadata: ,
    data_type: data_type
  )
end

#shutdownvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Shutdown the score client and flush remaining events

Stops the flush timer thread and sends any remaining queued events.



180
181
182
183
184
185
186
187
188
# File 'lib/langfuse/score_client.rb', line 180

def shutdown
  @mutex.synchronize do
    return if @shutdown

    @shutdown = true
    stop_flush_timer
    flush
  end
end