Module: Legion::TraceSearch

Defined in:
lib/legion/trace_search.rb

Constant Summary collapse

SCHEMA_TEMPLATE =
<<~PROMPT
  You translate natural language queries into JSON filter objects for the metering_records table.
  Current date/time: %<current_time>s

  Columns: id (integer), worker_id (string), event_type (string), extension (string),
  runner_function (string), status (string: success/failure), input_tokens (integer),
  output_tokens (integer), cost_usd (float), wall_clock_ms (integer), recorded_at (datetime)

  Return ONLY a valid JSON object with these possible keys:
  - "where": hash of column => value filters (e.g. {"status": "failure"})
  - "order": column name to sort by (prefix with "-" for descending, e.g. "-cost_usd")
  - "limit": integer limit (default 50)
  - "date_from": ISO date string for recorded_at >= filter
  - "date_to": ISO date string for recorded_at <= filter

  For relative time references, compute ISO dates from the current date/time above:
  - "today" => date_from is today's date at 00:00
  - "last hour" => date_from is 1 hour ago
  - "this week" => date_from is Monday of this week
  - "yesterday" => date_from/date_to bracket yesterday

  Examples:
  - "failed tasks" => {"where": {"status": "failure"}}
  - "most expensive calls" => {"order": "-cost_usd", "limit": 20}
  - "tasks by worker-1 today" => {"where": {"worker_id": "worker-1"}, "date_from": "%<today>s"}

  Return ONLY the JSON object, no explanation.
PROMPT
FILTER_SCHEMA =
{
  type:       'object',
  properties: {
    where:     { type: 'object' },
    order:     { type: 'string' },
    limit:     { type: 'integer' },
    date_from: { type: 'string' },
    date_to:   { type: 'string' }
  }
}.freeze
ALLOWED_COLUMNS =
%w[
  id worker_id event_type extension runner_function status
  input_tokens output_tokens cost_usd wall_clock_ms recorded_at
].freeze

Class Method Summary collapse

Class Method Details

.aggregate_stats(dataset) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/legion/trace_search.rb', line 158

def aggregate_stats(dataset)
  dataset.select(
    Sequel.function(:count, Sequel.lit('*')).as(:total_records),
    Sequel.function(:sum, :input_tokens).as(:total_tokens_in),
    Sequel.function(:sum, :output_tokens).as(:total_tokens_out),
    Sequel.function(:sum, :cost_usd).as(:total_cost),
    Sequel.function(:avg, :wall_clock_ms).as(:avg_latency_ms),
    Sequel.function(:max, :wall_clock_ms).as(:max_latency_ms),
    Sequel.function(:min, :recorded_at).as(:earliest),
    Sequel.function(:max, :recorded_at).as(:latest)
  ).first || {}
end

.apply_date_filters(dataset, parsed) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
# File 'lib/legion/trace_search.rb', line 101

def apply_date_filters(dataset, parsed)
  if parsed[:date_from]
    from = safe_parse_time(parsed[:date_from])
    dataset = dataset.where { recorded_at >= from } if from
  end
  if parsed[:date_to]
    to = safe_parse_time(parsed[:date_to])
    dataset = dataset.where { recorded_at <= to } if to
  end
  dataset
end

.apply_ordering(dataset, parsed) ⇒ Object



121
122
123
124
125
126
127
128
# File 'lib/legion/trace_search.rb', line 121

def apply_ordering(dataset, parsed)
  return dataset unless parsed[:order].is_a?(String)

  col = parsed[:order].delete_prefix('-')
  return dataset unless ALLOWED_COLUMNS.include?(col)

  parsed[:order].start_with?('-') ? dataset.order(Sequel.desc(col.to_sym)) : dataset.order(col.to_sym)
end

.build_filtered_dataset(parsed) ⇒ Object



149
150
151
152
153
154
155
156
# File 'lib/legion/trace_search.rb', line 149

def build_filtered_dataset(parsed)
  ds = Legion::Data.connection[:metering_records]
  if parsed[:where].is_a?(Hash)
    safe_where = parsed[:where].select { |k, _| ALLOWED_COLUMNS.include?(k.to_s) }
    ds = ds.where(safe_where.transform_keys(&:to_sym))
  end
  apply_date_filters(ds, parsed)
end

.compute_summary(parsed) ⇒ Object



140
141
142
143
144
145
146
147
# File 'lib/legion/trace_search.rb', line 140

def compute_summary(parsed)
  return { error: 'data unavailable' } unless defined?(Legion::Data) && Legion::Data.respond_to?(:connection) && Legion::Data.connection

  ds = build_filtered_dataset(parsed)
  row = aggregate_stats(ds)

  format_summary(ds, row, parsed)
end

.detect_anomalies(threshold: 2.0) ⇒ Object



191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/legion/trace_search.rb', line 191

def detect_anomalies(threshold: 2.0)
  return { error: 'data unavailable' } unless data_available?

  now = Time.now.utc
  recent = period_stats(now - 3600, now)
  baseline = period_stats(now - 86_400, now - 3600)

  build_anomaly_report(recent, baseline, threshold)
rescue StandardError => e
  Legion::Logging.error("[TraceSearch] detect_anomalies failed: #{e.message}") if defined?(Legion::Logging)
  { error: e.message }
end

.execute_filter(parsed, default_limit) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/legion/trace_search.rb', line 82

def execute_filter(parsed, default_limit)
  return { results: [], error: 'data unavailable' } unless defined?(Legion::Data) && Legion::Data.respond_to?(:connection) && Legion::Data.connection

  ds = Legion::Data.connection[:metering_records]

  if parsed[:where].is_a?(Hash)
    safe_where = parsed[:where].select { |k, _| ALLOWED_COLUMNS.include?(k.to_s) }
    ds = ds.where(safe_where.transform_keys(&:to_sym))
  end

  ds = apply_date_filters(ds, parsed)
  ds = apply_ordering(ds, parsed)

  limit = [parsed[:limit] || default_limit, 200].min
  total = ds.count
  results = ds.limit(limit).all
  { results: results, count: results.size, total: total, truncated: total > limit, filter: parsed }
end

.format_summary(dataset, row, parsed) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/legion/trace_search.rb', line 171

def format_summary(dataset, row, parsed)
  {
    total_records:    row[:total_records] || 0,
    total_tokens_in:  row[:total_tokens_in] || 0,
    total_tokens_out: row[:total_tokens_out] || 0,
    total_cost:       (row[:total_cost] || 0).to_f.round(4),
    avg_latency_ms:   (row[:avg_latency_ms] || 0).to_f.round(1),
    max_latency_ms:   row[:max_latency_ms] || 0,
    time_range:       { from: row[:earliest], to: row[:latest] },
    status_counts:    dataset.group_and_count(:status).all.to_h { |r| [r[:status], r[:count]] },
    top_extensions:   top_by(dataset, :extension).map { |r| { name: r[:extension], count: r[:count] } },
    top_workers:      top_by(dataset, :worker_id).map { |r| { id: r[:worker_id], count: r[:count] } },
    filter:           parsed
  }
end

.generate_filter(query) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/legion/trace_search.rb', line 62

def generate_filter(query)
  return nil unless defined?(Legion::LLM) && Legion::LLM.respond_to?(:structured)

  result = Legion::LLM.structured(
    messages: [
      { role: 'system', content: schema_context },
      { role: 'user',   content: query }
    ],
    schema:   FILTER_SCHEMA,
    caller:   { source: 'cli', command: 'trace' }
  )
  Legion::Logging.error "[TraceSearch] LLM filter generation failed for query: #{query.inspect}" if !result[:valid] && defined?(Legion::Logging)
  result[:data] if result[:valid]
end

.safe_parse_time(value) ⇒ Object



113
114
115
116
117
118
119
# File 'lib/legion/trace_search.rb', line 113

def safe_parse_time(value)
  return value if value.is_a?(Time)

  Time.parse(value.to_s)
rescue ArgumentError
  nil
end

.schema_contextObject



77
78
79
80
# File 'lib/legion/trace_search.rb', line 77

def schema_context
  now = Time.now
  format(SCHEMA_TEMPLATE, current_time: now.iso8601, today: now.strftime('%Y-%m-%d'))
end

.search(query, limit: 50) ⇒ Object



51
52
53
54
55
56
57
58
59
60
# File 'lib/legion/trace_search.rb', line 51

def search(query, limit: 50)
  Legion::Logging.info "[TraceSearch] query: #{query.inspect} limit=#{limit}" if defined?(Legion::Logging)
  parsed = generate_filter(query)
  return { results: [], error: 'no filter generated' } unless parsed

  execute_filter(parsed, limit)
rescue StandardError => e
  Legion::Logging.error "[TraceSearch] search failed: #{e.message}" if defined?(Legion::Logging)
  { results: [], error: e.message }
end

.summarize(query) ⇒ Object



130
131
132
133
134
135
136
137
138
# File 'lib/legion/trace_search.rb', line 130

def summarize(query)
  parsed = generate_filter(query)
  return { error: 'no filter generated' } unless parsed

  compute_summary(parsed)
rescue StandardError => e
  Legion::Logging.error("[TraceSearch] summarize failed: #{e.message}") if defined?(Legion::Logging)
  { error: e.message }
end

.top_by(dataset, column, limit: 5) ⇒ Object



187
188
189
# File 'lib/legion/trace_search.rb', line 187

def top_by(dataset, column, limit: 5)
  dataset.group_and_count(column).order(Sequel.desc(:count)).limit(limit).all
end

.trend(hours: 24, buckets: 12) ⇒ Object



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/legion/trace_search.rb', line 204

def trend(hours: 24, buckets: 12)
  return { error: 'data unavailable' } unless data_available?

  now = Time.now.utc
  bucket_seconds = (hours * 3600.0 / buckets).to_i
  start_time = now - (hours * 3600)

  data = buckets.times.map do |i|
    bucket_start = start_time + (i * bucket_seconds)
    bucket_end = bucket_start + bucket_seconds
    stats = period_stats(bucket_start, bucket_end)
    { time: bucket_start.iso8601, **stats }
  end

  { buckets: data, hours: hours, bucket_count: buckets, bucket_minutes: bucket_seconds / 60 }
rescue StandardError => e
  Legion::Logging.error("[TraceSearch] trend failed: #{e.message}") if defined?(Legion::Logging)
  { error: e.message }
end