Class: Langfuse::ApiClient
- Inherits:
-
Object
- Object
- Langfuse::ApiClient
- Defined in:
- lib/langfuse/api_client.rb
Overview
HTTP client for Langfuse API
Handles authentication, connection management, and HTTP requests to the Langfuse REST API.
Instance Attribute Summary collapse
-
#base_url ⇒ String
readonly
Base URL for Langfuse API.
-
#cache ⇒ PromptCache, ...
readonly
Optional cache for prompt responses.
-
#logger ⇒ Logger
readonly
Logger instance for debugging.
-
#public_key ⇒ String
readonly
Langfuse public API key.
-
#secret_key ⇒ String
readonly
Langfuse secret API key.
-
#timeout ⇒ Integer
readonly
HTTP request timeout in seconds.
Instance Method Summary collapse
-
#connection(timeout: nil) ⇒ Faraday::Connection
Get a Faraday connection.
-
#create_dataset(name:, description: nil, metadata: nil) ⇒ Hash
Create a new dataset.
-
#create_dataset_item(dataset_name:, input: nil, expected_output: nil, metadata: nil, id: nil, source_trace_id: nil, source_observation_id: nil, status: nil) ⇒ Hash
Create a new dataset item (or upsert if id is provided).
-
#create_dataset_run_item(dataset_item_id:, run_name:, trace_id: nil, observation_id: nil, metadata: nil, run_description: nil) ⇒ Hash
Create a dataset run item (link a trace to a dataset item within a run).
-
#create_prompt(name:, prompt:, type:, config: {}, labels: [], tags: [], commit_message: nil) ⇒ Hash
Create a new prompt (or new version if prompt with same name exists).
-
#delete_dataset_item(id) ⇒ Hash
Delete a dataset item by ID.
-
#delete_dataset_run(dataset_name:, run_name:) ⇒ Hash?
Delete a dataset run by name.
-
#get_dataset(name) ⇒ Hash
Fetch a dataset by name.
-
#get_dataset_item(id) ⇒ Hash
Fetch a dataset item by ID.
-
#get_dataset_run(dataset_name:, run_name:) ⇒ Hash
Fetch a dataset run by dataset and run name.
-
#get_projects ⇒ Hash
Fetch projects accessible with the current API keys.
-
#get_prompt(name, version: nil, label: nil) ⇒ Hash
Fetch a prompt from the Langfuse API.
-
#get_trace(id) ⇒ Hash
Fetch a trace by ID.
-
#initialize(public_key:, secret_key:, base_url:, timeout: 5, logger: nil, cache: nil) ⇒ ApiClient
constructor
Initialize a new API client.
-
#list_dataset_items(dataset_name:, page: nil, limit: nil, source_trace_id: nil, source_observation_id: nil) ⇒ Array<Hash>
List items in a dataset with optional filters.
-
#list_dataset_items_paginated(dataset_name:, page: nil, limit: nil, source_trace_id: nil, source_observation_id: nil) ⇒ Hash
private
Full paginated response including “meta” for internal pagination use.
-
#list_dataset_runs(dataset_name:, page: nil, limit: nil) ⇒ Array<Hash>
List dataset runs in a dataset.
-
#list_dataset_runs_paginated(dataset_name:, page: nil, limit: nil) ⇒ Hash
private
Full paginated response including “meta” for internal pagination use.
-
#list_datasets(page: nil, limit: nil) ⇒ Array<Hash>
List all datasets in the project.
-
#list_prompts(page: nil, limit: nil) ⇒ Array<Hash>
List all prompts in the Langfuse project.
-
#list_traces(page: nil, limit: nil, user_id: nil, name: nil, session_id: nil, from_timestamp: nil, to_timestamp: nil, order_by: nil, tags: nil, version: nil, release: nil, environment: nil, fields: nil, filter: nil) ⇒ Array<Hash>
List traces in the project.
-
#list_traces_paginated(page: nil, limit: nil, user_id: nil, name: nil, session_id: nil, from_timestamp: nil, to_timestamp: nil, order_by: nil, tags: nil, version: nil, release: nil, environment: nil, fields: nil, filter: nil) ⇒ Hash
private
Full paginated response including “meta” for internal pagination use.
-
#send_batch(events) ⇒ void
Send a batch of events to the Langfuse ingestion API.
-
#shutdown ⇒ void
Shut down the API client and release resources.
-
#update_prompt(name:, version:, labels:) ⇒ Hash
Update labels for an existing prompt version.
Constructor Details
#initialize(public_key:, secret_key:, base_url:, timeout: 5, logger: nil, cache: nil) ⇒ ApiClient
Initialize a new API client
52 53 54 55 56 57 58 59 |
# File 'lib/langfuse/api_client.rb', line 52 def initialize(public_key:, secret_key:, base_url:, timeout: 5, logger: nil, cache: nil) @public_key = public_key @secret_key = secret_key @base_url = base_url @timeout = timeout @logger = logger || Logger.new($stdout, level: Logger::WARN) @cache = cache end |
Instance Attribute Details
#base_url ⇒ String (readonly)
Returns Base URL for Langfuse API.
32 33 34 |
# File 'lib/langfuse/api_client.rb', line 32 def base_url @base_url end |
#cache ⇒ PromptCache, ... (readonly)
Returns Optional cache for prompt responses.
41 42 43 |
# File 'lib/langfuse/api_client.rb', line 41 def cache @cache end |
#logger ⇒ Logger (readonly)
Returns Logger instance for debugging.
38 39 40 |
# File 'lib/langfuse/api_client.rb', line 38 def logger @logger end |
#public_key ⇒ String (readonly)
Returns Langfuse public API key.
26 27 28 |
# File 'lib/langfuse/api_client.rb', line 26 def public_key @public_key end |
#secret_key ⇒ String (readonly)
Returns Langfuse secret API key.
29 30 31 |
# File 'lib/langfuse/api_client.rb', line 29 def secret_key @secret_key end |
#timeout ⇒ Integer (readonly)
Returns HTTP request timeout in seconds.
35 36 37 |
# File 'lib/langfuse/api_client.rb', line 35 def timeout @timeout end |
Instance Method Details
#connection(timeout: nil) ⇒ Faraday::Connection
Get a Faraday connection
65 66 67 68 69 70 71 72 73 |
# File 'lib/langfuse/api_client.rb', line 65 def connection(timeout: nil) if timeout # Create dedicated connection for custom timeout # to avoid mutating shared connection build_connection(timeout: timeout) else @connection ||= build_connection end end |
#create_dataset(name:, description: nil, metadata: nil) ⇒ Hash
Create a new dataset
471 472 473 474 475 476 477 478 |
# File 'lib/langfuse/api_client.rb', line 471 def create_dataset(name:, description: nil, metadata: nil) with_faraday_error_handling do payload = { name: name, description: description, metadata: }.compact response = connection.post("/api/public/v2/datasets", payload) handle_response(response) end end |
#create_dataset_item(dataset_name:, input: nil, expected_output: nil, metadata: nil, id: nil, source_trace_id: nil, source_observation_id: nil, status: nil) ⇒ Hash
Create a new dataset item (or upsert if id is provided)
rubocop:disable Metrics/ParameterLists
501 502 503 504 505 506 507 508 509 510 511 512 513 514 |
# File 'lib/langfuse/api_client.rb', line 501 def create_dataset_item(dataset_name:, input: nil, expected_output: nil, metadata: nil, id: nil, source_trace_id: nil, source_observation_id: nil, status: nil) with_faraday_error_handling do payload = build_dataset_item_payload( dataset_name: dataset_name, input: input, expected_output: expected_output, metadata: , id: id, source_trace_id: source_trace_id, source_observation_id: source_observation_id, status: status ) response = connection.post("/api/public/dataset-items", payload) handle_response(response) end end |
#create_dataset_run_item(dataset_item_id:, run_name:, trace_id: nil, observation_id: nil, metadata: nil, run_description: nil) ⇒ Hash
Create a dataset run item (link a trace to a dataset item within a run)
249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/langfuse/api_client.rb', line 249 def create_dataset_run_item(dataset_item_id:, run_name:, trace_id: nil, observation_id: nil, metadata: nil, run_description: nil) with_faraday_error_handling do payload = { datasetItemId: dataset_item_id, runName: run_name } payload[:traceId] = trace_id if trace_id payload[:observationId] = observation_id if observation_id payload[:metadata] = if payload[:runDescription] = run_description if run_description response = connection.post("/api/public/dataset-run-items", payload) handle_response(response) end end |
#create_prompt(name:, prompt:, type:, config: {}, labels: [], tags: [], commit_message: nil) ⇒ Hash
Create a new prompt (or new version if prompt with same name exists)
rubocop:disable Metrics/ParameterLists
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/langfuse/api_client.rb', line 147 def create_prompt(name:, prompt:, type:, config: {}, labels: [], tags: [], commit_message: nil) with_faraday_error_handling do path = "/api/public/v2/prompts" payload = { name: name, prompt: prompt, type: type, config: config, labels: labels, tags: } payload[:commitMessage] = if response = connection.post(path, payload) handle_response(response) end end |
#delete_dataset_item(id) ⇒ Hash
404 responses are treated as success to keep DELETE idempotent across retries
Delete a dataset item by ID
584 585 586 587 588 589 590 591 592 593 594 |
# File 'lib/langfuse/api_client.rb', line 584 def delete_dataset_item(id) encoded_id = URI.encode_uri_component(id) response = connection.delete("/api/public/dataset-items/#{encoded_id}") handle_delete_dataset_item_response(response, id) rescue Faraday::RetriableResponse => e logger.error("Faraday error: Retries exhausted - #{e.response.status}") handle_delete_dataset_item_response(e.response, id) rescue Faraday::Error => e logger.error("Faraday error: #{e.}") raise ApiError, "HTTP request failed: #{e.}" end |
#delete_dataset_run(dataset_name:, run_name:) ⇒ Hash?
404 responses raise NotFoundError to preserve strict delete semantics
Delete a dataset run by name
311 312 313 314 315 316 |
# File 'lib/langfuse/api_client.rb', line 311 def delete_dataset_run(dataset_name:, run_name:) with_faraday_error_handling do response = connection.delete(dataset_run_path(dataset_name: dataset_name, run_name: run_name)) response.status == 204 ? nil : handle_response(response) end end |
#get_dataset(name) ⇒ Hash
Fetch a dataset by name
452 453 454 455 456 457 458 |
# File 'lib/langfuse/api_client.rb', line 452 def get_dataset(name) with_faraday_error_handling do encoded_name = URI.encode_uri_component(name) response = connection.get("/api/public/v2/datasets/#{encoded_name}") handle_response(response) end end |
#get_dataset_item(id) ⇒ Hash
Fetch a dataset item by ID
527 528 529 530 531 532 533 |
# File 'lib/langfuse/api_client.rb', line 527 def get_dataset_item(id) with_faraday_error_handling do encoded_id = URI.encode_uri_component(id) response = connection.get("/api/public/dataset-items/#{encoded_id}") handle_response(response) end end |
#get_dataset_run(dataset_name:, run_name:) ⇒ Hash
Fetch a dataset run by dataset and run name
271 272 273 274 275 276 |
# File 'lib/langfuse/api_client.rb', line 271 def get_dataset_run(dataset_name:, run_name:) with_faraday_error_handling do response = connection.get(dataset_run_path(dataset_name: dataset_name, run_name: run_name)) handle_response(response) end end |
#get_projects ⇒ Hash
Fetch projects accessible with the current API keys
327 328 329 330 331 332 |
# File 'lib/langfuse/api_client.rb', line 327 def get_projects # rubocop:disable Naming/AccessorMethodName with_faraday_error_handling do response = connection.get("/api/public/projects") handle_response(response) end end |
#get_prompt(name, version: nil, label: nil) ⇒ Hash
Fetch a prompt from the Langfuse API
Checks cache first if caching is enabled. On cache miss, fetches from API and stores in cache. When using Rails.cache backend, uses distributed lock to prevent cache stampedes.
117 118 119 120 121 122 123 |
# File 'lib/langfuse/api_client.rb', line 117 def get_prompt(name, version: nil, label: nil) raise ArgumentError, "Cannot specify both version and label" if version && label return fetch_prompt_from_api(name, version: version, label: label) if cache.nil? cache_key = PromptCache.build_key(name, version: version, label: label) fetch_with_appropriate_caching_strategy(cache_key, name, version, label) end |
#get_trace(id) ⇒ Hash
Fetch a trace by ID
414 415 416 417 418 419 420 |
# File 'lib/langfuse/api_client.rb', line 414 def get_trace(id) with_faraday_error_handling do encoded_id = URI.encode_uri_component(id) response = connection.get("/api/public/traces/#{encoded_id}") handle_response(response) end end |
#list_dataset_items(dataset_name:, page: nil, limit: nil, source_trace_id: nil, source_observation_id: nil) ⇒ Array<Hash>
List items in a dataset with optional filters
548 549 550 551 552 553 554 555 |
# File 'lib/langfuse/api_client.rb', line 548 def list_dataset_items(dataset_name:, page: nil, limit: nil, source_trace_id: nil, source_observation_id: nil) result = list_dataset_items_paginated( dataset_name: dataset_name, page: page, limit: limit, source_trace_id: source_trace_id, source_observation_id: source_observation_id ) result["data"] || [] end |
#list_dataset_items_paginated(dataset_name:, page: nil, limit: nil, source_trace_id: nil, source_observation_id: nil) ⇒ Hash
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Full paginated response including “meta” for internal pagination use
561 562 563 564 565 566 567 568 569 570 571 572 |
# File 'lib/langfuse/api_client.rb', line 561 def list_dataset_items_paginated(dataset_name:, page: nil, limit: nil, source_trace_id: nil, source_observation_id: nil) with_faraday_error_handling do params = build_dataset_items_params( dataset_name: dataset_name, page: page, limit: limit, source_trace_id: source_trace_id, source_observation_id: source_observation_id ) response = connection.get("/api/public/dataset-items", params) handle_response(response) end end |
#list_dataset_runs(dataset_name:, page: nil, limit: nil) ⇒ Array<Hash>
List dataset runs in a dataset
286 287 288 289 |
# File 'lib/langfuse/api_client.rb', line 286 def list_dataset_runs(dataset_name:, page: nil, limit: nil) result = list_dataset_runs_paginated(dataset_name: dataset_name, page: page, limit: limit) result["data"] || [] end |
#list_dataset_runs_paginated(dataset_name:, page: nil, limit: nil) ⇒ Hash
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Full paginated response including “meta” for internal pagination use
295 296 297 298 299 300 |
# File 'lib/langfuse/api_client.rb', line 295 def list_dataset_runs_paginated(dataset_name:, page: nil, limit: nil) with_faraday_error_handling do response = connection.get(dataset_runs_path(dataset_name), build_dataset_runs_params(page: page, limit: limit)) handle_response(response) end end |
#list_datasets(page: nil, limit: nil) ⇒ Array<Hash>
List all datasets in the project
432 433 434 435 436 437 438 439 440 |
# File 'lib/langfuse/api_client.rb', line 432 def list_datasets(page: nil, limit: nil) with_faraday_error_handling do params = { page: page, limit: limit }.compact response = connection.get("/api/public/v2/datasets", params) result = handle_response(response) result["data"] || [] end end |
#list_prompts(page: nil, limit: nil) ⇒ Array<Hash>
List all prompts in the Langfuse project
Fetches a list of all prompt names available in your project. Note: This returns metadata only, not full prompt content.
91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/langfuse/api_client.rb', line 91 def list_prompts(page: nil, limit: nil) with_faraday_error_handling do params = { page: page, limit: limit }.compact response = connection.get("/api/public/v2/prompts", params) result = handle_response(response) # API returns { data: [...], meta: {...} } result["data"] || [] end end |
#list_traces(page: nil, limit: nil, user_id: nil, name: nil, session_id: nil, from_timestamp: nil, to_timestamp: nil, order_by: nil, tags: nil, version: nil, release: nil, environment: nil, fields: nil, filter: nil) ⇒ Array<Hash>
List traces in the project
rubocop:disable Metrics/ParameterLists
366 367 368 369 370 371 372 373 374 375 376 377 378 |
# File 'lib/langfuse/api_client.rb', line 366 def list_traces(page: nil, limit: nil, user_id: nil, name: nil, session_id: nil, from_timestamp: nil, to_timestamp: nil, order_by: nil, tags: nil, version: nil, release: nil, environment: nil, fields: nil, filter: nil) result = list_traces_paginated( page: page, limit: limit, user_id: user_id, name: name, session_id: session_id, from_timestamp: , to_timestamp: , order_by: order_by, tags: , version: version, release: release, environment: environment, fields: fields, filter: filter ) result["data"] || [] end |
#list_traces_paginated(page: nil, limit: nil, user_id: nil, name: nil, session_id: nil, from_timestamp: nil, to_timestamp: nil, order_by: nil, tags: nil, version: nil, release: nil, environment: nil, fields: nil, filter: nil) ⇒ Hash
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Full paginated response including “meta” for internal pagination use
rubocop:disable Metrics/ParameterLists
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 |
# File 'lib/langfuse/api_client.rb', line 386 def list_traces_paginated(page: nil, limit: nil, user_id: nil, name: nil, session_id: nil, from_timestamp: nil, to_timestamp: nil, order_by: nil, tags: nil, version: nil, release: nil, environment: nil, fields: nil, filter: nil) with_faraday_error_handling do params = build_traces_params( page: page, limit: limit, user_id: user_id, name: name, session_id: session_id, from_timestamp: , to_timestamp: , order_by: order_by, tags: , version: version, release: release, environment: environment, fields: fields, filter: filter ) response = connection.get("/api/public/traces", params) handle_response(response) end end |
#send_batch(events) ⇒ void
This method returns an undefined value.
Send a batch of events to the Langfuse ingestion API
Sends events (scores, traces, observations) to the ingestion endpoint. Retries transient errors (429, 503, 504, network errors) with exponential backoff. Batch operations are idempotent (events have unique IDs), so retries are safe.
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/langfuse/api_client.rb', line 217 def send_batch(events) raise ArgumentError, "events must be an array" unless events.is_a?(Array) raise ArgumentError, "events array cannot be empty" if events.empty? path = "/api/public/ingestion" payload = { batch: events } response = connection.post(path, payload) handle_batch_response(response) rescue Faraday::RetriableResponse => e # Retry middleware exhausted all retries - handle the final response logger.error("Langfuse batch send failed: Retries exhausted - #{e.response.status}") handle_batch_response(e.response) rescue Faraday::Error => e logger.error("Langfuse batch send failed: #{e.}") raise ApiError, "Batch send failed: #{e.}" end |
#shutdown ⇒ void
This method returns an undefined value.
Shut down the API client and release resources
Shuts down the cache if it supports shutdown (e.g., SWR thread pool).
339 340 341 |
# File 'lib/langfuse/api_client.rb', line 339 def shutdown cache.shutdown if cache.respond_to?(:shutdown) end |
#update_prompt(name:, version:, labels:) ⇒ Hash
Update labels for an existing prompt version
183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/langfuse/api_client.rb', line 183 def update_prompt(name:, version:, labels:) raise ArgumentError, "labels must be an array" unless labels.is_a?(Array) with_faraday_error_handling do path = "/api/public/v2/prompts/#{URI.encode_uri_component(name)}/versions/#{version}" payload = { newLabels: labels } response = connection.patch(path, payload) handle_response(response) end end |