Class: Langfuse::ScoreClient Private
- Inherits:
-
Object
- Object
- Langfuse::ScoreClient
- Defined in:
- lib/langfuse/score_client.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Client for creating and batching Langfuse scores
Handles thread-safe queuing, batching, and sending of score events to the Langfuse ingestion API. Scores are batched and sent automatically based on batch_size and flush_interval configuration.
rubocop:disable Metrics/ClassLength
Constant Summary collapse
- HEX_TRACE_ID_PATTERN =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
/\A[0-9a-f]{32}\z/
Instance Attribute Summary collapse
-
#api_client ⇒ ApiClient
readonly
private
The API client for sending batches.
-
#config ⇒ Config
readonly
private
Configuration object.
-
#logger ⇒ Logger
readonly
private
Logger instance.
Instance Method Summary collapse
-
#create(name:, value:, id: nil, trace_id: nil, session_id: nil, observation_id: nil, comment: nil, metadata: nil, environment: nil, data_type: :numeric, dataset_run_id: nil, config_id: nil) ⇒ void
private
Create a score event and queue it for batching.
-
#flush ⇒ void
private
Force flush all queued score events.
-
#initialize(api_client:, config:) ⇒ ScoreClient
constructor
private
Initialize a new ScoreClient.
-
#score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
private
Create a score for the currently active observation (from OTel span).
-
#score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
private
Create a score for the currently active trace (from OTel span).
-
#shutdown ⇒ void
private
Shutdown the score client and flush remaining events.
Constructor Details
#initialize(api_client:, config:) ⇒ ScoreClient
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Initialize a new ScoreClient
40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/langfuse/score_client.rb', line 40 def initialize(api_client:, config:) @api_client = api_client @config = config @logger = config.logger @queue = Queue.new @mutex = Mutex.new @flush_thread = nil @shutdown = false # Match the immutable tracing setup contract: once this client exists, later config # mutations must not change score sampling without rebuilding the client. @score_sampler = Sampling.build_sampler(config.sample_rate) start_flush_timer end |
Instance Attribute Details
#api_client ⇒ ApiClient (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns The API client for sending batches.
26 27 28 |
# File 'lib/langfuse/score_client.rb', line 26 def api_client @api_client end |
#config ⇒ Config (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns Configuration object.
29 30 31 |
# File 'lib/langfuse/score_client.rb', line 29 def config @config end |
#logger ⇒ Logger (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns Logger instance.
32 33 34 |
# File 'lib/langfuse/score_client.rb', line 32 def logger @logger end |
Instance Method Details
#create(name:, value:, id: nil, trace_id: nil, session_id: nil, observation_id: nil, comment: nil, metadata: nil, environment: nil, data_type: :numeric, dataset_run_id: nil, config_id: nil) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Create a score event and queue it for batching
rubocop:disable Metrics/ParameterLists
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/langfuse/score_client.rb', line 81 def create(name:, value:, id: nil, trace_id: nil, session_id: nil, observation_id: nil, comment: nil, metadata: nil, environment: nil, data_type: :numeric, dataset_run_id: nil, config_id: nil) validate_name(name) normalized_value = normalize_value(value, data_type) data_type_str = Types::SCORE_DATA_TYPES[data_type] || raise(ArgumentError, "Invalid data_type: #{data_type}") return unless enqueue_trace_linked_score?(trace_id) event = build_score_event( name: name, value: normalized_value, id: id, trace_id: trace_id, session_id: session_id, observation_id: observation_id, comment: comment, metadata: , environment: environment, data_type: data_type_str, dataset_run_id: dataset_run_id, config_id: config_id ) @queue << event flush if @queue.size >= config.batch_size rescue StandardError => e logger.error("Langfuse score creation failed: #{e.}") raise end |
#flush ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Force flush all queued score events
Sends all queued events to the API immediately.
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/langfuse/score_client.rb', line 170 def flush return if @queue.empty? events = [] @queue.size.times do events << @queue.pop(true) rescue StandardError nil end events.compact! return if events.empty? send_batch(events) rescue StandardError => e logger.error("Langfuse score flush failed: #{e.}") # Don't raise - silent error handling for batch operations end |
#score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Create a score for the currently active observation (from OTel span)
Extracts observation_id and trace_id from the active OpenTelemetry span.
120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/langfuse/score_client.rb', line 120 def score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ids = extract_ids_from_active_span raise ArgumentError, "No active OpenTelemetry span found" unless ids[:observation_id] create( name: name, value: value, trace_id: ids[:trace_id], observation_id: ids[:observation_id], comment: comment, metadata: , data_type: data_type ) end |
#score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Create a score for the currently active trace (from OTel span)
Extracts trace_id from the active OpenTelemetry span.
151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/langfuse/score_client.rb', line 151 def score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ids = extract_ids_from_active_span raise ArgumentError, "No active OpenTelemetry span found" unless ids[:trace_id] create( name: name, value: value, trace_id: ids[:trace_id], comment: comment, metadata: , data_type: data_type ) end |
#shutdown ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Shutdown the score client and flush remaining events
Stops the flush timer thread and sends any remaining queued events.
194 195 196 197 198 199 200 201 202 |
# File 'lib/langfuse/score_client.rb', line 194 def shutdown @mutex.synchronize do return if @shutdown @shutdown = true stop_flush_timer flush end end |