Class: Langfuse::ApiClient
- Inherits:
-
Object
- Object
- Langfuse::ApiClient
- Defined in:
- lib/langfuse/api_client.rb
Overview
HTTP client for Langfuse API
Handles authentication, connection management, and HTTP requests to the Langfuse REST API.
Instance Attribute Summary collapse
-
#base_url ⇒ String
readonly
Base URL for Langfuse API.
-
#cache ⇒ PromptCache, ...
readonly
Optional cache for prompt responses.
-
#logger ⇒ Logger
readonly
Logger instance for debugging.
-
#public_key ⇒ String
readonly
Langfuse public API key.
-
#secret_key ⇒ String
readonly
Langfuse secret API key.
-
#timeout ⇒ Integer
readonly
HTTP request timeout in seconds.
Instance Method Summary collapse
-
#connection(timeout: nil) ⇒ Faraday::Connection
Get a Faraday connection.
-
#create_dataset(name:, description: nil, metadata: nil) ⇒ Hash
Create a new dataset.
-
#create_dataset_item(dataset_name:, input: nil, expected_output: nil, metadata: nil, id: nil, source_trace_id: nil, source_observation_id: nil, status: nil) ⇒ Hash
Create a new dataset item (or upsert if id is provided).
-
#create_dataset_run_item(dataset_item_id:, run_name:, trace_id: nil, observation_id: nil, metadata: nil, run_description: nil) ⇒ Hash
Create a dataset run item (link a trace to a dataset item within a run).
-
#create_prompt(name:, prompt:, type:, config: {}, labels: [], tags: [], commit_message: nil) ⇒ Hash
Create a new prompt (or new version if prompt with same name exists).
-
#delete_dataset_item(id) ⇒ Hash
Delete a dataset item by ID.
-
#get_dataset(name) ⇒ Hash
Fetch a dataset by name.
-
#get_dataset_item(id) ⇒ Hash
Fetch a dataset item by ID.
-
#get_projects ⇒ Hash
Fetch projects accessible with the current API keys.
-
#get_prompt(name, version: nil, label: nil) ⇒ Hash
Fetch a prompt from the Langfuse API.
-
#initialize(public_key:, secret_key:, base_url:, timeout: 5, logger: nil, cache: nil) ⇒ ApiClient
constructor
Initialize a new API client.
-
#list_dataset_items(dataset_name:, page: nil, limit: nil, source_trace_id: nil, source_observation_id: nil) ⇒ Array<Hash>
List items in a dataset with optional filters.
-
#list_dataset_items_paginated(dataset_name:, page: nil, limit: nil, source_trace_id: nil, source_observation_id: nil) ⇒ Hash
private
Full paginated response including “meta” for internal pagination use.
-
#list_datasets(page: nil, limit: nil) ⇒ Array<Hash>
List all datasets in the project.
-
#list_prompts(page: nil, limit: nil) ⇒ Array<Hash>
List all prompts in the Langfuse project.
-
#send_batch(events) ⇒ void
Send a batch of events to the Langfuse ingestion API.
-
#shutdown ⇒ void
Shut down the API client and release resources.
-
#update_prompt(name:, version:, labels:) ⇒ Hash
Update labels for an existing prompt version.
Constructor Details
#initialize(public_key:, secret_key:, base_url:, timeout: 5, logger: nil, cache: nil) ⇒ ApiClient
Initialize a new API client
52 53 54 55 56 57 58 59 |
# File 'lib/langfuse/api_client.rb', line 52 def initialize(public_key:, secret_key:, base_url:, timeout: 5, logger: nil, cache: nil) @public_key = public_key @secret_key = secret_key @base_url = base_url @timeout = timeout @logger = logger || Logger.new($stdout, level: Logger::WARN) @cache = cache end |
Instance Attribute Details
#base_url ⇒ String (readonly)
Returns Base URL for Langfuse API.
32 33 34 |
# File 'lib/langfuse/api_client.rb', line 32 def base_url @base_url end |
#cache ⇒ PromptCache, ... (readonly)
Returns Optional cache for prompt responses.
41 42 43 |
# File 'lib/langfuse/api_client.rb', line 41 def cache @cache end |
#logger ⇒ Logger (readonly)
Returns Logger instance for debugging.
38 39 40 |
# File 'lib/langfuse/api_client.rb', line 38 def logger @logger end |
#public_key ⇒ String (readonly)
Returns Langfuse public API key.
26 27 28 |
# File 'lib/langfuse/api_client.rb', line 26 def public_key @public_key end |
#secret_key ⇒ String (readonly)
Returns Langfuse secret API key.
29 30 31 |
# File 'lib/langfuse/api_client.rb', line 29 def secret_key @secret_key end |
#timeout ⇒ Integer (readonly)
Returns HTTP request timeout in seconds.
35 36 37 |
# File 'lib/langfuse/api_client.rb', line 35 def timeout @timeout end |
Instance Method Details
#connection(timeout: nil) ⇒ Faraday::Connection
Get a Faraday connection
65 66 67 68 69 70 71 72 73 |
# File 'lib/langfuse/api_client.rb', line 65 def connection(timeout: nil) if timeout # Create dedicated connection for custom timeout # to avoid mutating shared connection build_connection(timeout: timeout) else @connection ||= build_connection end end |
#create_dataset(name:, description: nil, metadata: nil) ⇒ Hash
Create a new dataset
337 338 339 340 341 342 343 344 |
# File 'lib/langfuse/api_client.rb', line 337 def create_dataset(name:, description: nil, metadata: nil) with_faraday_error_handling do payload = { name: name, description: description, metadata: }.compact response = connection.post("/api/public/v2/datasets", payload) handle_response(response) end end |
#create_dataset_item(dataset_name:, input: nil, expected_output: nil, metadata: nil, id: nil, source_trace_id: nil, source_observation_id: nil, status: nil) ⇒ Hash
Create a new dataset item (or upsert if id is provided)
rubocop:disable Metrics/ParameterLists
367 368 369 370 371 372 373 374 375 376 377 378 379 380 |
# File 'lib/langfuse/api_client.rb', line 367 def create_dataset_item(dataset_name:, input: nil, expected_output: nil, metadata: nil, id: nil, source_trace_id: nil, source_observation_id: nil, status: nil) with_faraday_error_handling do payload = build_dataset_item_payload( dataset_name: dataset_name, input: input, expected_output: expected_output, metadata: , id: id, source_trace_id: source_trace_id, source_observation_id: source_observation_id, status: status ) response = connection.post("/api/public/dataset-items", payload) handle_response(response) end end |
#create_dataset_run_item(dataset_item_id:, run_name:, trace_id: nil, observation_id: nil, metadata: nil, run_description: nil) ⇒ Hash
Create a dataset run item (link a trace to a dataset item within a run)
249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/langfuse/api_client.rb', line 249 def create_dataset_run_item(dataset_item_id:, run_name:, trace_id: nil, observation_id: nil, metadata: nil, run_description: nil) with_faraday_error_handling do payload = { datasetItemId: dataset_item_id, runName: run_name } payload[:traceId] = trace_id if trace_id payload[:observationId] = observation_id if observation_id payload[:metadata] = if payload[:runDescription] = run_description if run_description response = connection.post("/api/public/dataset-run-items", payload) handle_response(response) end end |
#create_prompt(name:, prompt:, type:, config: {}, labels: [], tags: [], commit_message: nil) ⇒ Hash
Create a new prompt (or new version if prompt with same name exists)
rubocop:disable Metrics/ParameterLists
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/langfuse/api_client.rb', line 147 def create_prompt(name:, prompt:, type:, config: {}, labels: [], tags: [], commit_message: nil) with_faraday_error_handling do path = "/api/public/v2/prompts" payload = { name: name, prompt: prompt, type: type, config: config, labels: labels, tags: } payload[:commitMessage] = if response = connection.post(path, payload) handle_response(response) end end |
#delete_dataset_item(id) ⇒ Hash
404 responses are treated as success to keep DELETE idempotent across retries
Delete a dataset item by ID
450 451 452 453 454 455 456 457 458 459 460 |
# File 'lib/langfuse/api_client.rb', line 450 def delete_dataset_item(id) encoded_id = URI.encode_uri_component(id) response = connection.delete("/api/public/dataset-items/#{encoded_id}") handle_delete_dataset_item_response(response, id) rescue Faraday::RetriableResponse => e logger.error("Faraday error: Retries exhausted - #{e.response.status}") handle_delete_dataset_item_response(e.response, id) rescue Faraday::Error => e logger.error("Faraday error: #{e.}") raise ApiError, "HTTP request failed: #{e.}" end |
#get_dataset(name) ⇒ Hash
Fetch a dataset by name
318 319 320 321 322 323 324 |
# File 'lib/langfuse/api_client.rb', line 318 def get_dataset(name) with_faraday_error_handling do encoded_name = URI.encode_uri_component(name) response = connection.get("/api/public/v2/datasets/#{encoded_name}") handle_response(response) end end |
#get_dataset_item(id) ⇒ Hash
Fetch a dataset item by ID
393 394 395 396 397 398 399 |
# File 'lib/langfuse/api_client.rb', line 393 def get_dataset_item(id) with_faraday_error_handling do encoded_id = URI.encode_uri_component(id) response = connection.get("/api/public/dataset-items/#{encoded_id}") handle_response(response) end end |
#get_projects ⇒ Hash
Fetch projects accessible with the current API keys
272 273 274 275 276 277 |
# File 'lib/langfuse/api_client.rb', line 272 def get_projects # rubocop:disable Naming/AccessorMethodName with_faraday_error_handling do response = connection.get("/api/public/projects") handle_response(response) end end |
#get_prompt(name, version: nil, label: nil) ⇒ Hash
Fetch a prompt from the Langfuse API
Checks cache first if caching is enabled. On cache miss, fetches from API and stores in cache. When using Rails.cache backend, uses distributed lock to prevent cache stampedes.
117 118 119 120 121 122 123 |
# File 'lib/langfuse/api_client.rb', line 117 def get_prompt(name, version: nil, label: nil) raise ArgumentError, "Cannot specify both version and label" if version && label return fetch_prompt_from_api(name, version: version, label: label) if cache.nil? cache_key = PromptCache.build_key(name, version: version, label: label) fetch_with_appropriate_caching_strategy(cache_key, name, version, label) end |
#list_dataset_items(dataset_name:, page: nil, limit: nil, source_trace_id: nil, source_observation_id: nil) ⇒ Array<Hash>
List items in a dataset with optional filters
414 415 416 417 418 419 420 421 |
# File 'lib/langfuse/api_client.rb', line 414 def list_dataset_items(dataset_name:, page: nil, limit: nil, source_trace_id: nil, source_observation_id: nil) result = list_dataset_items_paginated( dataset_name: dataset_name, page: page, limit: limit, source_trace_id: source_trace_id, source_observation_id: source_observation_id ) result["data"] || [] end |
#list_dataset_items_paginated(dataset_name:, page: nil, limit: nil, source_trace_id: nil, source_observation_id: nil) ⇒ Hash
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Full paginated response including “meta” for internal pagination use
427 428 429 430 431 432 433 434 435 436 437 438 |
# File 'lib/langfuse/api_client.rb', line 427 def list_dataset_items_paginated(dataset_name:, page: nil, limit: nil, source_trace_id: nil, source_observation_id: nil) with_faraday_error_handling do params = build_dataset_items_params( dataset_name: dataset_name, page: page, limit: limit, source_trace_id: source_trace_id, source_observation_id: source_observation_id ) response = connection.get("/api/public/dataset-items", params) handle_response(response) end end |
#list_datasets(page: nil, limit: nil) ⇒ Array<Hash>
List all datasets in the project
298 299 300 301 302 303 304 305 306 |
# File 'lib/langfuse/api_client.rb', line 298 def list_datasets(page: nil, limit: nil) with_faraday_error_handling do params = { page: page, limit: limit }.compact response = connection.get("/api/public/v2/datasets", params) result = handle_response(response) result["data"] || [] end end |
#list_prompts(page: nil, limit: nil) ⇒ Array<Hash>
List all prompts in the Langfuse project
Fetches a list of all prompt names available in your project. Note: This returns metadata only, not full prompt content.
91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/langfuse/api_client.rb', line 91 def list_prompts(page: nil, limit: nil) with_faraday_error_handling do params = { page: page, limit: limit }.compact response = connection.get("/api/public/v2/prompts", params) result = handle_response(response) # API returns { data: [...], meta: {...} } result["data"] || [] end end |
#send_batch(events) ⇒ void
This method returns an undefined value.
Send a batch of events to the Langfuse ingestion API
Sends events (scores, traces, observations) to the ingestion endpoint. Retries transient errors (429, 503, 504, network errors) with exponential backoff. Batch operations are idempotent (events have unique IDs), so retries are safe.
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/langfuse/api_client.rb', line 217 def send_batch(events) raise ArgumentError, "events must be an array" unless events.is_a?(Array) raise ArgumentError, "events array cannot be empty" if events.empty? path = "/api/public/ingestion" payload = { batch: events } response = connection.post(path, payload) handle_batch_response(response) rescue Faraday::RetriableResponse => e # Retry middleware exhausted all retries - handle the final response logger.error("Langfuse batch send failed: Retries exhausted - #{e.response.status}") handle_batch_response(e.response) rescue Faraday::Error => e logger.error("Langfuse batch send failed: #{e.}") raise ApiError, "Batch send failed: #{e.}" end |
#shutdown ⇒ void
This method returns an undefined value.
Shut down the API client and release resources
Shuts down the cache if it supports shutdown (e.g., SWR thread pool).
284 285 286 |
# File 'lib/langfuse/api_client.rb', line 284 def shutdown cache.shutdown if cache.respond_to?(:shutdown) end |
#update_prompt(name:, version:, labels:) ⇒ Hash
Update labels for an existing prompt version
183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/langfuse/api_client.rb', line 183 def update_prompt(name:, version:, labels:) raise ArgumentError, "labels must be an array" unless labels.is_a?(Array) with_faraday_error_handling do path = "/api/public/v2/prompts/#{URI.encode_uri_component(name)}/versions/#{version}" payload = { newLabels: labels } response = connection.patch(path, payload) handle_response(response) end end |