Class: Langfuse::ScoreClient Private
- Inherits:
-
Object
- Object
- Langfuse::ScoreClient
- Defined in:
- lib/langfuse/score_client.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Client for creating and batching Langfuse scores
Handles thread-safe queuing, batching, and sending of score events to the Langfuse ingestion API. Scores are batched and sent automatically based on batch_size and flush_interval configuration.
rubocop:disable Metrics/ClassLength
Instance Attribute Summary collapse
- #api_client ⇒ Object readonly private
- #config ⇒ Object readonly private
- #logger ⇒ Object readonly private
Instance Method Summary collapse
-
#create(name:, value:, trace_id: nil, observation_id: nil, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
private
Create a score event and queue it for batching.
-
#flush ⇒ void
private
Force flush all queued score events.
-
#initialize(api_client:, config:) ⇒ ScoreClient
constructor
private
Initialize a new ScoreClient.
-
#score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
private
Create a score for the currently active observation (from OTel span).
-
#score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
private
Create a score for the currently active trace (from OTel span).
-
#shutdown ⇒ void
private
Shutdown the score client and flush remaining events.
Constructor Details
#initialize(api_client:, config:) ⇒ ScoreClient
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Initialize a new ScoreClient
31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/langfuse/score_client.rb', line 31 def initialize(api_client:, config:) @api_client = api_client @config = config @logger = config.logger @queue = Queue.new @mutex = Mutex.new @flush_thread = nil @shutdown = false start_flush_timer end |
Instance Attribute Details
#api_client ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
25 26 27 |
# File 'lib/langfuse/score_client.rb', line 25 def api_client @api_client end |
#config ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
25 26 27 |
# File 'lib/langfuse/score_client.rb', line 25 def config @config end |
#logger ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
25 26 27 |
# File 'lib/langfuse/score_client.rb', line 25 def logger @logger end |
Instance Method Details
#create(name:, value:, trace_id: nil, observation_id: nil, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Create a score event and queue it for batching
rubocop:disable Metrics/ParameterLists
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/langfuse/score_client.rb', line 64 def create(name:, value:, trace_id: nil, observation_id: nil, comment: nil, metadata: nil, data_type: :numeric) validate_name(name) normalized_value = normalize_value(value, data_type) data_type_str = Types::SCORE_DATA_TYPES[data_type] || raise(ArgumentError, "Invalid data_type: #{data_type}") event = build_score_event( name: name, value: normalized_value, trace_id: trace_id, observation_id: observation_id, comment: comment, metadata: , data_type: data_type_str ) @queue << event # Trigger flush if batch size reached flush if @queue.size >= config.batch_size rescue StandardError => e logger.error("Langfuse score creation failed: #{e.}") raise end |
#flush ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Force flush all queued score events
Sends all queued events to the API immediately.
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/langfuse/score_client.rb', line 156 def flush return if @queue.empty? events = [] @queue.size.times do events << @queue.pop(true) rescue StandardError nil end events.compact! return if events.empty? send_batch(events) rescue StandardError => e logger.error("Langfuse score flush failed: #{e.}") # Don't raise - silent error handling for batch operations end |
#score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Create a score for the currently active observation (from OTel span)
Extracts observation_id and trace_id from the active OpenTelemetry span.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/langfuse/score_client.rb', line 106 def score_active_observation(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ids = extract_ids_from_active_span raise ArgumentError, "No active OpenTelemetry span found" unless ids[:observation_id] create( name: name, value: value, trace_id: ids[:trace_id], observation_id: ids[:observation_id], comment: comment, metadata: , data_type: data_type ) end |
#score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Create a score for the currently active trace (from OTel span)
Extracts trace_id from the active OpenTelemetry span.
137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/langfuse/score_client.rb', line 137 def score_active_trace(name:, value:, comment: nil, metadata: nil, data_type: :numeric) ids = extract_ids_from_active_span raise ArgumentError, "No active OpenTelemetry span found" unless ids[:trace_id] create( name: name, value: value, trace_id: ids[:trace_id], comment: comment, metadata: , data_type: data_type ) end |
#shutdown ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Shutdown the score client and flush remaining events
Stops the flush timer thread and sends any remaining queued events.
180 181 182 183 184 185 186 187 188 |
# File 'lib/langfuse/score_client.rb', line 180 def shutdown @mutex.synchronize do return if @shutdown @shutdown = true stop_flush_timer flush end end |