Module: Legion::TraceSearch
- Defined in:
- lib/legion/trace_search.rb
Constant Summary collapse
- SCHEMA_TEMPLATE =
<<~PROMPT You translate natural language queries into JSON filter objects for the metering_records table. Current date/time: %<current_time>s Columns: id (integer), worker_id (string), event_type (string), extension (string), runner_function (string), status (string: success/failure), input_tokens (integer), output_tokens (integer), cost_usd (float), wall_clock_ms (integer), recorded_at (datetime) Return ONLY a valid JSON object with these possible keys: - "where": hash of column => value filters (e.g. {"status": "failure"}) - "order": column name to sort by (prefix with "-" for descending, e.g. "-cost_usd") - "limit": integer limit (default 50) - "date_from": ISO date string for recorded_at >= filter - "date_to": ISO date string for recorded_at <= filter For relative time references, compute ISO dates from the current date/time above: - "today" => date_from is today's date at 00:00 - "last hour" => date_from is 1 hour ago - "this week" => date_from is Monday of this week - "yesterday" => date_from/date_to bracket yesterday Examples: - "failed tasks" => {"where": {"status": "failure"}} - "most expensive calls" => {"order": "-cost_usd", "limit": 20} - "tasks by worker-1 today" => {"where": {"worker_id": "worker-1"}, "date_from": "%<today>s"} Return ONLY the JSON object, no explanation. PROMPT
- FILTER_SCHEMA =
{ type: 'object', properties: { where: { type: 'object' }, order: { type: 'string' }, limit: { type: 'integer' }, date_from: { type: 'string' }, date_to: { type: 'string' } } }.freeze
- ALLOWED_COLUMNS =
%w[ id worker_id event_type extension runner_function status input_tokens output_tokens cost_usd wall_clock_ms recorded_at ].freeze
Class Method Summary collapse
- .aggregate_stats(dataset) ⇒ Object
- .apply_date_filters(dataset, parsed) ⇒ Object
- .apply_ordering(dataset, parsed) ⇒ Object
- .build_filtered_dataset(parsed) ⇒ Object
- .compute_summary(parsed) ⇒ Object
- .detect_anomalies(threshold: 2.0) ⇒ Object
- .execute_filter(parsed, default_limit) ⇒ Object
- .format_summary(dataset, row, parsed) ⇒ Object
- .generate_filter(query) ⇒ Object
- .safe_parse_time(value) ⇒ Object
- .schema_context ⇒ Object
- .search(query, limit: 50) ⇒ Object
- .summarize(query) ⇒ Object
- .top_by(dataset, column, limit: 5) ⇒ Object
- .trend(hours: 24, buckets: 12) ⇒ Object
Class Method Details
.aggregate_stats(dataset) ⇒ Object
158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/legion/trace_search.rb', line 158 def aggregate_stats(dataset) dataset.select( Sequel.function(:count, Sequel.lit('*')).as(:total_records), Sequel.function(:sum, :input_tokens).as(:total_tokens_in), Sequel.function(:sum, :output_tokens).as(:total_tokens_out), Sequel.function(:sum, :cost_usd).as(:total_cost), Sequel.function(:avg, :wall_clock_ms).as(:avg_latency_ms), Sequel.function(:max, :wall_clock_ms).as(:max_latency_ms), Sequel.function(:min, :recorded_at).as(:earliest), Sequel.function(:max, :recorded_at).as(:latest) ).first || {} end |
.apply_date_filters(dataset, parsed) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/legion/trace_search.rb', line 101 def apply_date_filters(dataset, parsed) if parsed[:date_from] from = safe_parse_time(parsed[:date_from]) dataset = dataset.where { recorded_at >= from } if from end if parsed[:date_to] to = safe_parse_time(parsed[:date_to]) dataset = dataset.where { recorded_at <= to } if to end dataset end |
.apply_ordering(dataset, parsed) ⇒ Object
121 122 123 124 125 126 127 128 |
# File 'lib/legion/trace_search.rb', line 121 def apply_ordering(dataset, parsed) return dataset unless parsed[:order].is_a?(String) col = parsed[:order].delete_prefix('-') return dataset unless ALLOWED_COLUMNS.include?(col) parsed[:order].start_with?('-') ? dataset.order(Sequel.desc(col.to_sym)) : dataset.order(col.to_sym) end |
.build_filtered_dataset(parsed) ⇒ Object
149 150 151 152 153 154 155 156 |
# File 'lib/legion/trace_search.rb', line 149 def build_filtered_dataset(parsed) ds = Legion::Data.connection[:metering_records] if parsed[:where].is_a?(Hash) safe_where = parsed[:where].select { |k, _| ALLOWED_COLUMNS.include?(k.to_s) } ds = ds.where(safe_where.transform_keys(&:to_sym)) end apply_date_filters(ds, parsed) end |
.compute_summary(parsed) ⇒ Object
140 141 142 143 144 145 146 147 |
# File 'lib/legion/trace_search.rb', line 140 def compute_summary(parsed) return { error: 'data unavailable' } unless defined?(Legion::Data) && Legion::Data.respond_to?(:connection) && Legion::Data.connection ds = build_filtered_dataset(parsed) row = aggregate_stats(ds) format_summary(ds, row, parsed) end |
.detect_anomalies(threshold: 2.0) ⇒ Object
191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/legion/trace_search.rb', line 191 def detect_anomalies(threshold: 2.0) return { error: 'data unavailable' } unless data_available? now = Time.now.utc recent = period_stats(now - 3600, now) baseline = period_stats(now - 86_400, now - 3600) build_anomaly_report(recent, baseline, threshold) rescue StandardError => e Legion::Logging.error("[TraceSearch] detect_anomalies failed: #{e.}") if defined?(Legion::Logging) { error: e. } end |
.execute_filter(parsed, default_limit) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/legion/trace_search.rb', line 82 def execute_filter(parsed, default_limit) return { results: [], error: 'data unavailable' } unless defined?(Legion::Data) && Legion::Data.respond_to?(:connection) && Legion::Data.connection ds = Legion::Data.connection[:metering_records] if parsed[:where].is_a?(Hash) safe_where = parsed[:where].select { |k, _| ALLOWED_COLUMNS.include?(k.to_s) } ds = ds.where(safe_where.transform_keys(&:to_sym)) end ds = apply_date_filters(ds, parsed) ds = apply_ordering(ds, parsed) limit = [parsed[:limit] || default_limit, 200].min total = ds.count results = ds.limit(limit).all { results: results, count: results.size, total: total, truncated: total > limit, filter: parsed } end |
.format_summary(dataset, row, parsed) ⇒ Object
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/legion/trace_search.rb', line 171 def format_summary(dataset, row, parsed) { total_records: row[:total_records] || 0, total_tokens_in: row[:total_tokens_in] || 0, total_tokens_out: row[:total_tokens_out] || 0, total_cost: (row[:total_cost] || 0).to_f.round(4), avg_latency_ms: (row[:avg_latency_ms] || 0).to_f.round(1), max_latency_ms: row[:max_latency_ms] || 0, time_range: { from: row[:earliest], to: row[:latest] }, status_counts: dataset.group_and_count(:status).all.to_h { |r| [r[:status], r[:count]] }, top_extensions: top_by(dataset, :extension).map { |r| { name: r[:extension], count: r[:count] } }, top_workers: top_by(dataset, :worker_id).map { |r| { id: r[:worker_id], count: r[:count] } }, filter: parsed } end |
.generate_filter(query) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/legion/trace_search.rb', line 62 def generate_filter(query) return nil unless defined?(Legion::LLM) && Legion::LLM.respond_to?(:structured) result = Legion::LLM.structured( messages: [ { role: 'system', content: schema_context }, { role: 'user', content: query } ], schema: FILTER_SCHEMA, caller: { source: 'cli', command: 'trace' } ) Legion::Logging.error "[TraceSearch] LLM filter generation failed for query: #{query.inspect}" if !result[:valid] && defined?(Legion::Logging) result[:data] if result[:valid] end |
.safe_parse_time(value) ⇒ Object
113 114 115 116 117 118 119 |
# File 'lib/legion/trace_search.rb', line 113 def safe_parse_time(value) return value if value.is_a?(Time) Time.parse(value.to_s) rescue ArgumentError nil end |
.schema_context ⇒ Object
77 78 79 80 |
# File 'lib/legion/trace_search.rb', line 77 def schema_context now = Time.now format(SCHEMA_TEMPLATE, current_time: now.iso8601, today: now.strftime('%Y-%m-%d')) end |
.search(query, limit: 50) ⇒ Object
51 52 53 54 55 56 57 58 59 60 |
# File 'lib/legion/trace_search.rb', line 51 def search(query, limit: 50) Legion::Logging.info "[TraceSearch] query: #{query.inspect} limit=#{limit}" if defined?(Legion::Logging) parsed = generate_filter(query) return { results: [], error: 'no filter generated' } unless parsed execute_filter(parsed, limit) rescue StandardError => e Legion::Logging.error "[TraceSearch] search failed: #{e.}" if defined?(Legion::Logging) { results: [], error: e. } end |
.summarize(query) ⇒ Object
130 131 132 133 134 135 136 137 138 |
# File 'lib/legion/trace_search.rb', line 130 def summarize(query) parsed = generate_filter(query) return { error: 'no filter generated' } unless parsed compute_summary(parsed) rescue StandardError => e Legion::Logging.error("[TraceSearch] summarize failed: #{e.}") if defined?(Legion::Logging) { error: e. } end |
.top_by(dataset, column, limit: 5) ⇒ Object
187 188 189 |
# File 'lib/legion/trace_search.rb', line 187 def top_by(dataset, column, limit: 5) dataset.group_and_count(column).order(Sequel.desc(:count)).limit(limit).all end |
.trend(hours: 24, buckets: 12) ⇒ Object
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/legion/trace_search.rb', line 204 def trend(hours: 24, buckets: 12) return { error: 'data unavailable' } unless data_available? now = Time.now.utc bucket_seconds = (hours * 3600.0 / buckets).to_i start_time = now - (hours * 3600) data = buckets.times.map do |i| bucket_start = start_time + (i * bucket_seconds) bucket_end = bucket_start + bucket_seconds stats = period_stats(bucket_start, bucket_end) { time: bucket_start.iso8601, **stats } end { buckets: data, hours: hours, bucket_count: buckets, bucket_minutes: bucket_seconds / 60 } rescue StandardError => e Legion::Logging.error("[TraceSearch] trend failed: #{e.}") if defined?(Legion::Logging) { error: e. } end |