Class: Braintrust::Server::Services::Eval

Inherits:
Object
  • Object
show all
Defined in:
lib/braintrust/server/services/eval_service.rb

Overview

Framework-agnostic service for running evaluations and streaming SSE results. Must be long-lived (not per-request) to preserve the @state_cache across requests.

Instance Method Summary collapse

Constructor Details

#initialize(evaluators) ⇒ Eval

Returns a new instance of Eval.



11
12
13
14
15
# File 'lib/braintrust/server/services/eval_service.rb', line 11

def initialize(evaluators)
  @evaluators = evaluators
  @state_mutex = Mutex.new
  @state_cache = {}
end

Instance Method Details

#build_state(auth) ⇒ Object

Build State from auth context hash. Returns nil when auth is not a Hash (e.g. NoAuth returns true). Uses an LRU-style cache (max 64 entries) keyed by [api_key, app_url, org_name].



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/braintrust/server/services/eval_service.rb', line 129

def build_state(auth)
  return nil unless auth.is_a?(Hash)

  cache_key = [auth["api_key"], auth["app_url"], auth["org_name"]]

  @state_mutex ||= Mutex.new
  @state_cache ||= {}

  @state_mutex.synchronize do
    cached = @state_cache[cache_key]
    return cached if cached

    state = Braintrust::State.new(
      api_key: auth["api_key"],
      org_id: auth["org_id"],
      org_name: auth["org_name"],
      app_url: auth["app_url"],
      api_url: auth["api_url"],
      enable_tracing: false
    )

    if @state_cache.size >= 64
      oldest_key = @state_cache.keys.first
      @state_cache.delete(oldest_key)
    end

    @state_cache[cache_key] = state
    state
  end
end

#stream(validated, auth:, sse:) ⇒ Object

Runs the validated eval and streams SSE events via the sse writer. validated is the hash returned by #validate. auth is the auth context hash (or nil/true for no-auth). sse is an SSEWriter instance.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/braintrust/server/services/eval_service.rb', line 52

def stream(validated, auth:, sse:)
  name = validated[:name]
  evaluator = validated[:evaluator]
  cases = validated[:cases]
  dataset = validated[:dataset]
  experiment_name = validated[:experiment_name]
  remote_scorer_ids = validated[:remote_scorer_ids]
  parent = validated[:parent]
  project_id = validated[:project_id]
  parameters = validated[:parameters]

  state = build_state(auth)

  # Only pass project/experiment params when state is available
  run_opts = {
    on_progress: ->(progress_data) {
      # Build remote eval protocol events from generic progress data.
      # Runner provides: id, data/error, scores (optional), origin (optional).
      # Protocol requires: id, object_type, origin, name, format, output_type, event, data.
      base = {
        "object_type" => "task",
        "name" => name,
        "format" => "code",
        "output_type" => "completion"
      }
      base["id"] = progress_data["id"] if progress_data["id"]
      base["origin"] = progress_data["origin"] if progress_data["origin"]

      if progress_data.key?("error")
        sse.event("progress", JSON.dump(base.merge("event" => "error", "data" => progress_data["error"])))
      else
        sse.event("progress", JSON.dump(base.merge("event" => "json_delta", "data" => JSON.dump(progress_data["data"]))))
      end

      # Signal per-cell completion so the UI exits "Streaming..." state
      # and updates the progress bar immediately.
      sse.event("progress", JSON.dump(base.merge("event" => "done", "data" => "")))
    },
    quiet: true
  }
  run_opts[:parent] = parent if parent
  run_opts[:scorers] = remote_scorer_ids if remote_scorer_ids
  run_opts[:parameters] = parameters if parameters && !parameters.empty?
  run_opts[:dataset] = dataset if dataset

  if state
    run_opts[:state] = state
    run_opts[:experiment] = experiment_name if experiment_name
    run_opts[:project_id] = project_id if project_id
  end

  result = evaluator.run(cases, **run_opts)

  # Flush buffered OTLP spans before sending completion events.
  # The BatchSpanProcessor exports every ~5s; fast evals can finish
  # before a single export fires, causing the UI to see no results.
  Braintrust::Trace.flush_spans

  # Build summary from result scores
  averaged_scores = {}
  result.scorer_stats.each do |scorer_name, stats|
    averaged_scores[scorer_name] = stats.score_mean
  end

  sse.event("summary", JSON.dump({
    "scores" => averaged_scores,
    "experiment_name" => experiment_name,
    "experiment_id" => result.experiment_id,
    "project_id" => result.project_id
  }))

  sse.event("done", "")
end

#validate(body) ⇒ Object

Validates request body. Returns:

{error: String, status: Integer} on failure
{evaluator:, name:, cases:, dataset:, ...} on success


20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/braintrust/server/services/eval_service.rb', line 20

def validate(body)
  name = body["name"]
  return {error: "Missing required field: name", status: 400} unless name

  evaluator = current_evaluators[name]
  return {error: "Evaluator '#{name}' not found", status: 404} unless evaluator

  data = body["data"]
  return {error: "Missing required field: data", status: 400} unless data

  data_sources = ["data", "dataset_name", "dataset_id"].count { |k| data.key?(k) }
  return {error: "Exactly one data source required", status: 400} if data_sources != 1

  cases, dataset = resolve_data_source(data)

  {
    evaluator: evaluator,
    name: name,
    cases: cases,
    dataset: dataset,
    experiment_name: body["experiment_name"],
    remote_scorer_ids: resolve_remote_scorers(body["scores"]),
    parent: resolve_parent(body["parent"]),
    project_id: body["project_id"],
    parameters: resolve_parameters(body["parameters"], evaluator)
  }
end