Class: Braintrust::Server::Services::Eval
- Inherits:
-
Object
- Object
- Braintrust::Server::Services::Eval
- Defined in:
- lib/braintrust/server/services/eval_service.rb
Overview
Framework-agnostic service for running evaluations and streaming SSE results. Must be long-lived (not per-request) to preserve the @state_cache across requests.
Instance Method Summary collapse
-
#build_state(auth) ⇒ Object
Build State from auth context hash.
-
#initialize(evaluators) ⇒ Eval
constructor
A new instance of Eval.
-
#stream(validated, auth:, sse:) ⇒ Object
Runs the validated eval and streams SSE events via the sse writer.
-
#validate(body) ⇒ Object
Validates request body.
Constructor Details
#initialize(evaluators) ⇒ Eval
Returns a new instance of Eval.
11 12 13 14 15 |
# File 'lib/braintrust/server/services/eval_service.rb', line 11 def initialize(evaluators) @evaluators = evaluators @state_mutex = Mutex.new @state_cache = {} end |
Instance Method Details
#build_state(auth) ⇒ Object
Build State from auth context hash. Returns nil when auth is not a Hash (e.g. NoAuth returns true). Uses an LRU-style cache (max 64 entries) keyed by [api_key, app_url, org_name].
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/braintrust/server/services/eval_service.rb', line 129 def build_state(auth) return nil unless auth.is_a?(Hash) cache_key = [auth["api_key"], auth["app_url"], auth["org_name"]] @state_mutex ||= Mutex.new @state_cache ||= {} @state_mutex.synchronize do cached = @state_cache[cache_key] return cached if cached state = Braintrust::State.new( api_key: auth["api_key"], org_id: auth["org_id"], org_name: auth["org_name"], app_url: auth["app_url"], api_url: auth["api_url"], enable_tracing: false ) if @state_cache.size >= 64 oldest_key = @state_cache.keys.first @state_cache.delete(oldest_key) end @state_cache[cache_key] = state state end end |
#stream(validated, auth:, sse:) ⇒ Object
Runs the validated eval and streams SSE events via the sse writer. validated is the hash returned by #validate. auth is the auth context hash (or nil/true for no-auth). sse is an SSEWriter instance.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/braintrust/server/services/eval_service.rb', line 52 def stream(validated, auth:, sse:) name = validated[:name] evaluator = validated[:evaluator] cases = validated[:cases] dataset = validated[:dataset] experiment_name = validated[:experiment_name] remote_scorer_ids = validated[:remote_scorer_ids] parent = validated[:parent] project_id = validated[:project_id] parameters = validated[:parameters] state = build_state(auth) # Only pass project/experiment params when state is available run_opts = { on_progress: ->(progress_data) { # Build remote eval protocol events from generic progress data. # Runner provides: id, data/error, scores (optional), origin (optional). # Protocol requires: id, object_type, origin, name, format, output_type, event, data. base = { "object_type" => "task", "name" => name, "format" => "code", "output_type" => "completion" } base["id"] = progress_data["id"] if progress_data["id"] base["origin"] = progress_data["origin"] if progress_data["origin"] if progress_data.key?("error") sse.event("progress", JSON.dump(base.merge("event" => "error", "data" => progress_data["error"]))) else sse.event("progress", JSON.dump(base.merge("event" => "json_delta", "data" => JSON.dump(progress_data["data"])))) end # Signal per-cell completion so the UI exits "Streaming..." state # and updates the progress bar immediately. sse.event("progress", JSON.dump(base.merge("event" => "done", "data" => ""))) }, quiet: true } run_opts[:parent] = parent if parent run_opts[:scorers] = remote_scorer_ids if remote_scorer_ids run_opts[:parameters] = parameters if parameters && !parameters.empty? run_opts[:dataset] = dataset if dataset if state run_opts[:state] = state run_opts[:experiment] = experiment_name if experiment_name run_opts[:project_id] = project_id if project_id end result = evaluator.run(cases, **run_opts) # Flush buffered OTLP spans before sending completion events. # The BatchSpanProcessor exports every ~5s; fast evals can finish # before a single export fires, causing the UI to see no results. Braintrust::Trace.flush_spans # Build summary from result scores averaged_scores = {} result.scorer_stats.each do |scorer_name, stats| averaged_scores[scorer_name] = stats.score_mean end sse.event("summary", JSON.dump({ "scores" => averaged_scores, "experiment_name" => experiment_name, "experiment_id" => result.experiment_id, "project_id" => result.project_id })) sse.event("done", "") end |
#validate(body) ⇒ Object
Validates request body. Returns:
{error: String, status: Integer} on failure
{evaluator:, name:, cases:, dataset:, ...} on success
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/braintrust/server/services/eval_service.rb', line 20 def validate(body) name = body["name"] return {error: "Missing required field: name", status: 400} unless name evaluator = current_evaluators[name] return {error: "Evaluator '#{name}' not found", status: 404} unless evaluator data = body["data"] return {error: "Missing required field: data", status: 400} unless data data_sources = ["data", "dataset_name", "dataset_id"].count { |k| data.key?(k) } return {error: "Exactly one data source required", status: 400} if data_sources != 1 cases, dataset = resolve_data_source(data) { evaluator: evaluator, name: name, cases: cases, dataset: dataset, experiment_name: body["experiment_name"], remote_scorer_ids: resolve_remote_scorers(body["scores"]), parent: resolve_parent(body["parent"]), project_id: body["project_id"], parameters: resolve_parameters(body["parameters"], evaluator) } end |