Class: Langfuse::ScoreClient Private
- Inherits:
-
Object
- Object
- Langfuse::ScoreClient
- Defined in:
- lib/langfuse/score_client.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Client for creating and batching Langfuse scores
Handles thread-safe queuing, batching, and sending of score events to the Langfuse ingestion API. Scores are batched and sent automatically based on batch_size and flush_interval configuration.
rubocop:disable Metrics/ClassLength
Instance Attribute Summary collapse
-
#api_client ⇒ ApiClient
readonly
private
The API client for sending batches.
-
#config ⇒ Config
readonly
private
Configuration object.
-
#logger ⇒ Logger
readonly
private
Logger instance.
Instance Method Summary collapse
-
#create(name:, value:, trace_id: nil, observation_id: nil, comment: nil, metadata: nil, data_type: :numeric, dataset_run_id: nil, config_id: nil) ⇒ void
private
Create a score event and queue it for batching.
-
#flush ⇒ void
private
Force flush all queued score events.
-
#initialize(api_client:, config:) ⇒ ScoreClient
constructor
private
Initialize a new ScoreClient.
-
#score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
private
Create a score for the currently active observation (from OTel span).
-
#score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
private
Create a score for the currently active trace (from OTel span).
-
#shutdown ⇒ void
private
Shutdown the score client and flush remaining events.
Constructor Details
#initialize(api_client:, config:) ⇒ ScoreClient
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Initialize a new ScoreClient
38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/langfuse/score_client.rb', line 38 def initialize(api_client:, config:) @api_client = api_client @config = config @logger = config.logger @queue = Queue.new @mutex = Mutex.new @flush_thread = nil @shutdown = false start_flush_timer end |
Instance Attribute Details
#api_client ⇒ ApiClient (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns The API client for sending batches.
26 27 28 |
# File 'lib/langfuse/score_client.rb', line 26 def api_client @api_client end |
#config ⇒ Config (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns Configuration object.
29 30 31 |
# File 'lib/langfuse/score_client.rb', line 29 def config @config end |
#logger ⇒ Logger (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns Logger instance.
32 33 34 |
# File 'lib/langfuse/score_client.rb', line 32 def logger @logger end |
Instance Method Details
#create(name:, value:, trace_id: nil, observation_id: nil, comment: nil, metadata: nil, data_type: :numeric, dataset_run_id: nil, config_id: nil) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Create a score event and queue it for batching
rubocop:disable Metrics/ParameterLists
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/langfuse/score_client.rb', line 73 def create(name:, value:, trace_id: nil, observation_id: nil, comment: nil, metadata: nil, data_type: :numeric, dataset_run_id: nil, config_id: nil) validate_name(name) normalized_value = normalize_value(value, data_type) data_type_str = Types::SCORE_DATA_TYPES[data_type] || raise(ArgumentError, "Invalid data_type: #{data_type}") event = build_score_event( name: name, value: normalized_value, trace_id: trace_id, observation_id: observation_id, comment: comment, metadata: , data_type: data_type_str, dataset_run_id: dataset_run_id, config_id: config_id ) @queue << event # Trigger flush if batch size reached flush if @queue.size >= config.batch_size rescue StandardError => e logger.error("Langfuse score creation failed: #{e.}") raise end |
#flush ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Force flush all queued score events
Sends all queued events to the API immediately.
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/langfuse/score_client.rb', line 167 def flush return if @queue.empty? events = [] @queue.size.times do events << @queue.pop(true) rescue StandardError nil end events.compact! return if events.empty? send_batch(events) rescue StandardError => e logger.error("Langfuse score flush failed: #{e.}") # Don't raise - silent error handling for batch operations end |
#score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Create a score for the currently active observation (from OTel span)
Extracts observation_id and trace_id from the active OpenTelemetry span.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/langfuse/score_client.rb', line 117 def score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ids = extract_ids_from_active_span raise ArgumentError, "No active OpenTelemetry span found" unless ids[:observation_id] create( name: name, value: value, trace_id: ids[:trace_id], observation_id: ids[:observation_id], comment: comment, metadata: , data_type: data_type ) end |
#score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Create a score for the currently active trace (from OTel span)
Extracts trace_id from the active OpenTelemetry span.
148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/langfuse/score_client.rb', line 148 def score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ids = extract_ids_from_active_span raise ArgumentError, "No active OpenTelemetry span found" unless ids[:trace_id] create( name: name, value: value, trace_id: ids[:trace_id], comment: comment, metadata: , data_type: data_type ) end |
#shutdown ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Shutdown the score client and flush remaining events
Stops the flush timer thread and sends any remaining queued events.
191 192 193 194 195 196 197 198 199 |
# File 'lib/langfuse/score_client.rb', line 191 def shutdown @mutex.synchronize do return if @shutdown @shutdown = true stop_flush_timer flush end end |