Class: Langfuse::ScoreClient Private
- Inherits:
-
Object
- Object
- Langfuse::ScoreClient
- Defined in:
- lib/langfuse/score_client.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Client for creating and batching Langfuse scores
Handles thread-safe queuing, batching, and sending of score events to the Langfuse ingestion API. Scores are batched and sent automatically based on batch_size and flush_interval configuration.
rubocop:disable Metrics/ClassLength
Instance Attribute Summary collapse
-
#api_client ⇒ ApiClient
readonly
private
The API client for sending batches.
-
#config ⇒ Config
readonly
private
Configuration object.
-
#logger ⇒ Logger
readonly
private
Logger instance.
Instance Method Summary collapse
-
#create(name:, value:, id: nil, trace_id: nil, session_id: nil, observation_id: nil, comment: nil, metadata: nil, environment: nil, data_type: :numeric, dataset_run_id: nil, config_id: nil) ⇒ void
private
Create a score event and queue it for batching.
-
#flush ⇒ void
private
Force flush all queued score events.
-
#initialize(api_client:, config:) ⇒ ScoreClient
constructor
private
Initialize a new ScoreClient.
-
#score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
private
Create a score for the currently active observation (from OTel span).
-
#score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
private
Create a score for the currently active trace (from OTel span).
-
#shutdown ⇒ void
private
Shutdown the score client and flush remaining events.
Constructor Details
#initialize(api_client:, config:) ⇒ ScoreClient
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Initialize a new ScoreClient
38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/langfuse/score_client.rb', line 38 def initialize(api_client:, config:) @api_client = api_client @config = config @logger = config.logger @queue = Queue.new @mutex = Mutex.new @flush_thread = nil @shutdown = false start_flush_timer end |
Instance Attribute Details
#api_client ⇒ ApiClient (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns The API client for sending batches.
26 27 28 |
# File 'lib/langfuse/score_client.rb', line 26 def api_client @api_client end |
#config ⇒ Config (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns Configuration object.
29 30 31 |
# File 'lib/langfuse/score_client.rb', line 29 def config @config end |
#logger ⇒ Logger (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns Logger instance.
32 33 34 |
# File 'lib/langfuse/score_client.rb', line 32 def logger @logger end |
Instance Method Details
#create(name:, value:, id: nil, trace_id: nil, session_id: nil, observation_id: nil, comment: nil, metadata: nil, environment: nil, data_type: :numeric, dataset_run_id: nil, config_id: nil) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Create a score event and queue it for batching
rubocop:disable Metrics/ParameterLists
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/langfuse/score_client.rb', line 76 def create(name:, value:, id: nil, trace_id: nil, session_id: nil, observation_id: nil, comment: nil, metadata: nil, environment: nil, data_type: :numeric, dataset_run_id: nil, config_id: nil) validate_name(name) # Keep identifier policy server-side to preserve cross-SDK parity and avoid blocking valid future payloads. normalized_value = normalize_value(value, data_type) data_type_str = Types::SCORE_DATA_TYPES[data_type] || raise(ArgumentError, "Invalid data_type: #{data_type}") event = build_score_event( name: name, value: normalized_value, id: id, trace_id: trace_id, session_id: session_id, observation_id: observation_id, comment: comment, metadata: , environment: environment, data_type: data_type_str, dataset_run_id: dataset_run_id, config_id: config_id ) @queue << event # Trigger flush if batch size reached flush if @queue.size >= config.batch_size rescue StandardError => e logger.error("Langfuse score creation failed: #{e.}") raise end |
#flush ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Force flush all queued score events
Sends all queued events to the API immediately.
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/langfuse/score_client.rb', line 174 def flush return if @queue.empty? events = [] @queue.size.times do events << @queue.pop(true) rescue StandardError nil end events.compact! return if events.empty? send_batch(events) rescue StandardError => e logger.error("Langfuse score flush failed: #{e.}") # Don't raise - silent error handling for batch operations end |
#score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Create a score for the currently active observation (from OTel span)
Extracts observation_id and trace_id from the active OpenTelemetry span.
124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/langfuse/score_client.rb', line 124 def score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ids = extract_ids_from_active_span raise ArgumentError, "No active OpenTelemetry span found" unless ids[:observation_id] create( name: name, value: value, trace_id: ids[:trace_id], observation_id: ids[:observation_id], comment: comment, metadata: , data_type: data_type ) end |
#score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Create a score for the currently active trace (from OTel span)
Extracts trace_id from the active OpenTelemetry span.
155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/langfuse/score_client.rb', line 155 def score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ids = extract_ids_from_active_span raise ArgumentError, "No active OpenTelemetry span found" unless ids[:trace_id] create( name: name, value: value, trace_id: ids[:trace_id], comment: comment, metadata: , data_type: data_type ) end |
#shutdown ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Shutdown the score client and flush remaining events
Stops the flush timer thread and sends any remaining queued events.
198 199 200 201 202 203 204 205 206 |
# File 'lib/langfuse/score_client.rb', line 198 def shutdown @mutex.synchronize do return if @shutdown @shutdown = true stop_flush_timer flush end end |