Class: RubyLLM::Agents::Execution
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- RubyLLM::Agents::Execution
- Includes:
- Analytics, Metrics, Replayable, Scopes
- Defined in:
- app/models/ruby_llm/agents/execution.rb,
app/models/ruby_llm/agents/execution/scopes.rb,
app/models/ruby_llm/agents/execution/metrics.rb,
app/models/ruby_llm/agents/execution/analytics.rb,
app/models/ruby_llm/agents/execution/replayable.rb
Overview
ActiveRecord model for tracking agent executions
Stores comprehensive execution data for observability and analytics.
Defined Under Namespace
Modules: Analytics, Metrics, Replayable, Scopes
Constant Summary collapse
- FINISH_REASONS =
Allowed finish reasons from LLM providers
%w[stop length content_filter tool_calls other].freeze
- FALLBACK_REASONS =
Allowed fallback reasons for model switching
%w[price_limit quality_fail rate_limit timeout safety error other].freeze
Instance Attribute Summary collapse
-
#agent_type ⇒ String
Full class name of the agent (e.g., “SearchAgent”).
-
#completed_at ⇒ Time?
When execution completed.
-
#duration_ms ⇒ Integer?
Execution duration in milliseconds.
-
#error_class ⇒ String?
Exception class name if failed.
-
#error_message ⇒ String?
Error message reader that survives soft purge.
-
#input_cost ⇒ BigDecimal?
Cost of input tokens in USD.
-
#input_tokens ⇒ Integer?
Number of input tokens.
-
#metadata ⇒ Hash
Custom metadata from metadata hook.
-
#model_id ⇒ String
LLM model identifier used.
-
#output_cost ⇒ BigDecimal?
Cost of output tokens in USD.
-
#output_tokens ⇒ Integer?
Number of output tokens.
-
#parameters ⇒ Hash
Sanitized parameters passed to the agent.
-
#started_at ⇒ Time
When execution started.
-
#status ⇒ String
Execution status: “running”, “success”, “error”, “timeout”.
-
#temperature ⇒ Float
Temperature setting used (0.0-2.0).
-
#total_cost ⇒ BigDecimal?
Total cost in USD.
-
#total_tokens ⇒ Integer?
Sum of input and output tokens.
Class Method Summary collapse
-
.calculate_period_success_rate(scope) ⇒ Float
Calculates success rate for a given scope.
-
.now_strip_data(range: "today") ⇒ Hash
Returns real-time dashboard data for the Now Strip Optimized: 3 queries (current aggregate + previous aggregate + running count) instead of ~15 individual count/sum/average queries.
-
.now_strip_data_for_dates(from:, to:) ⇒ Hash
Returns Now Strip data for a custom date range Optimized: 3 queries instead of ~15.
-
.pct_change(old_val, new_val) ⇒ Float?
Calculates percentage change between old and new values.
Instance Method Summary collapse
-
#aggregate_attempt_costs! ⇒ void
Aggregates costs from all attempts using each attempt’s model pricing.
-
#cached? ⇒ Boolean
Returns whether this execution was a cache hit.
-
#child? ⇒ Boolean
Returns whether this is a child (nested) execution.
-
#content_filtered? ⇒ Boolean
Returns whether the response was blocked by content filter.
-
#depth ⇒ Integer
Returns the execution tree depth.
-
#failed_attempts ⇒ Array<Hash>
Returns failed attempts.
-
#has_retries? ⇒ Boolean
Returns whether this execution had multiple attempts.
- #rate_limited ⇒ Object
- #rate_limited=(val) ⇒ Object
-
#rate_limited? ⇒ Boolean
Returns whether this execution was rate limited.
-
#response_hash ⇒ Hash
Returns the response payload as a Hash, regardless of how agents wrote it.
- #retryable ⇒ Object
- #retryable=(val) ⇒ Object
-
#root? ⇒ Boolean
Returns whether this is a root (top-level) execution.
-
#short_circuited_attempts ⇒ Array<Hash>
Returns short-circuited attempts (circuit breaker blocked).
-
#soft_purged? ⇒ Boolean
Returns whether this execution has had its detail payload soft-purged.
-
#soft_purged_at ⇒ Time?
Returns when this execution was soft-purged, if ever.
-
#streaming? ⇒ Boolean
Returns whether this execution used streaming.
-
#successful_attempt ⇒ Hash?
Returns the successful attempt data (if any).
-
#tenant_record ⇒ Object
Convenience method to access tenant_record through the tenant.
-
#tool_calls? ⇒ Boolean
(also: #has_tool_calls?)
Returns whether this execution made tool calls.
-
#truncated? ⇒ Boolean
Returns whether the response was truncated due to max_tokens.
-
#used_fallback? ⇒ Boolean
Returns whether this execution used fallback models.
Methods included from Replayable
#replay, #replay?, #replay_source, #replayable?, #replays
Methods included from Metrics
#calculate_costs!, #cost_per_1k_tokens, #duration_seconds, #formatted_input_cost, #formatted_output_cost, #formatted_total_cost, #tokens_per_second
Instance Attribute Details
#agent_type ⇒ String
Returns Full class name of the agent (e.g., “SearchAgent”).
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
#completed_at ⇒ Time?
Returns When execution completed.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
#duration_ms ⇒ Integer?
Returns Execution duration in milliseconds.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
#error_class ⇒ String?
Returns Exception class name if failed.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
#error_message ⇒ String?
Error message reader that survives soft purge.
Prefers detail.error_message when the detail row still exists, otherwise falls back to the truncated copy stored in metadata by the retention job. This lets error-rate trend analysis continue working past the soft-purge window.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
#input_cost ⇒ BigDecimal?
Returns Cost of input tokens in USD.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
#input_tokens ⇒ Integer?
Returns Number of input tokens.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
#metadata ⇒ Hash
Returns Custom metadata from metadata hook.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
#model_id ⇒ String
Returns LLM model identifier used.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
#output_cost ⇒ BigDecimal?
Returns Cost of output tokens in USD.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
#output_tokens ⇒ Integer?
Returns Number of output tokens.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
#parameters ⇒ Hash
Returns Sanitized parameters passed to the agent.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
#started_at ⇒ Time
Returns When execution started.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
#status ⇒ String
Returns Execution status: “running”, “success”, “error”, “timeout”.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
#temperature ⇒ Float
Returns Temperature setting used (0.0-2.0).
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
#total_cost ⇒ BigDecimal?
Returns Total cost in USD.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
#total_tokens ⇒ Integer?
Returns Sum of input and output tokens.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'app/models/ruby_llm/agents/execution.rb', line 46 class Execution < ::ActiveRecord::Base self.table_name = "ruby_llm_agents_executions" include Execution::Metrics include Execution::Scopes include Execution::Analytics include Execution::Replayable # Status enum # - running: execution in progress # - success: completed successfully # - error: completed with error # - timeout: completed due to timeout enum :status, %w[running success error timeout].index_by(&:itself), prefix: true # Allowed finish reasons from LLM providers FINISH_REASONS = %w[stop length content_filter tool_calls other].freeze # Allowed fallback reasons for model switching FALLBACK_REASONS = %w[price_limit quality_fail rate_limit timeout safety error other].freeze # Execution hierarchy associations belongs_to :parent_execution, class_name: "RubyLLM::Agents::Execution", optional: true belongs_to :root_execution, class_name: "RubyLLM::Agents::Execution", optional: true has_many :child_executions, class_name: "RubyLLM::Agents::Execution", foreign_key: :parent_execution_id, dependent: :nullify, inverse_of: :parent_execution # Detail record for large payloads (prompts, responses, tool calls, etc.) has_one :detail, class_name: "RubyLLM::Agents::ExecutionDetail", foreign_key: :execution_id, dependent: :destroy # Individual tool call records (real-time tracking) has_many :tool_executions, class_name: "RubyLLM::Agents::ToolExecution", foreign_key: :execution_id, dependent: :destroy # Delegations so existing code keeps working transparently delegate :system_prompt, :user_prompt, :assistant_prompt, :response, :messages_summary, :tool_calls, :attempts, :fallback_chain, :parameters, :routed_to, :classification_result, :cached_at, :cache_creation_tokens, to: :detail, prefix: false, allow_nil: true # Error message reader that survives soft purge. # # Prefers detail.error_message when the detail row still exists, otherwise # falls back to the truncated copy stored in metadata by the retention # job. This lets error-rate trend analysis continue working past the # soft-purge window. # # @return [String, nil] def detail&. || &.dig("error_message") end # Validations validates :agent_type, :model_id, :started_at, presence: true validates :status, inclusion: {in: statuses.keys} validates :temperature, numericality: {greater_than_or_equal_to: 0, less_than_or_equal_to: 2}, allow_nil: true validates :input_tokens, :output_tokens, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :duration_ms, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :input_cost, :output_cost, :total_cost, numericality: {greater_than_or_equal_to: 0}, allow_nil: true validates :finish_reason, inclusion: {in: FINISH_REASONS}, allow_nil: true before_save :calculate_total_tokens, if: -> { input_tokens_changed? || output_tokens_changed? } before_save :calculate_total_cost, if: -> { input_cost_changed? || output_cost_changed? } # Aggregates costs from all attempts using each attempt's model pricing # # Used for multi-attempt executions (retries/fallbacks) where different models # may have been used. Calculates total cost by summing individual attempt costs. # # @return [void] def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end # Returns whether this execution had multiple attempts # # @return [Boolean] true if more than one attempt was made def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end # Returns whether this execution used fallback models # # @return [Boolean] true if a different model than requested succeeded def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end # Returns the successful attempt data (if any) # # @return [Hash, nil] The successful attempt or nil def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end # Returns failed attempts # # @return [Array<Hash>] Failed attempt data def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end # Returns short-circuited attempts (circuit breaker blocked) # # @return [Array<Hash>] Short-circuited attempt data def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end # Returns whether this is a root (top-level) execution # # @return [Boolean] true if this is a root execution def root? parent_execution_id.nil? end # Returns whether this is a child (nested) execution # # @return [Boolean] true if this has a parent execution def child? parent_execution_id.present? end # Returns the execution tree depth # # @return [Integer] depth level (0 for root) def depth return 0 if root? parent_execution&.depth.to_i + 1 end # Returns whether this execution was a cache hit # # @return [Boolean] true if response was served from cache def cached? cache_hit == true end # Returns whether this execution was rate limited # # @return [Boolean] true if rate limiting occurred def rate_limited? &.dig("rate_limited") == true end # Returns the response payload as a Hash, regardless of how agents wrote it. # # The `execution_details.response` column is declared as JSON, but agents # may write plain strings (chat text), arrays (embeddings), or nil. Views # that want to look up specific keys (audio_url, image_url, etc.) need a # Hash they can safely `.dig` into. This reader returns an empty hash when # the stored response isn't a Hash, so callers don't need type guards. # # @return [Hash] Parsed response hash, or empty hash if not hash-shaped def response_hash raw = response raw.is_a?(Hash) ? raw : {} end # Returns whether this execution has had its detail payload soft-purged. # # Soft-purged executions retain all analytics columns (cost, tokens, # timing, status) but the large payloads (prompts, responses, tool # calls, attempts) stored in execution_details and tool_executions # have been destroyed by the retention job. # # @return [Boolean] true if the retention job has soft-purged this execution def soft_purged? &.key?("soft_purged_at") == true end # Returns when this execution was soft-purged, if ever. # # @return [Time, nil] Parsed timestamp or nil if not soft-purged def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end # Convenience accessors for niche fields stored in metadata JSON %w[span_id response_cache_key fallback_reason].each do |field| define_method(field) { &.dig(field) } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end %w[time_to_first_token_ms].each do |field| define_method(field) { &.dig(field)&.to_i } define_method(:"#{field}=") { |val| self. = ( || {}).merge(field => val) } end def retryable &.dig("retryable") end def retryable=(val) self. = ( || {}).merge("retryable" => val) end def rate_limited &.dig("rate_limited") end def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end # Convenience method to access tenant_record through the tenant def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end # Returns whether this execution used streaming # # @return [Boolean] true if streaming was enabled def streaming? streaming == true end # Returns whether the response was truncated due to max_tokens # # @return [Boolean] true if hit token limit def truncated? finish_reason == "length" end # Returns whether the response was blocked by content filter # # @return [Boolean] true if blocked by safety filter def content_filtered? finish_reason == "content_filter" end # Returns whether this execution made tool calls # # @return [Boolean] true if tool calls were made def tool_calls? tool_calls_count.to_i > 0 end alias_method :has_tool_calls?, :tool_calls? # Returns real-time dashboard data for the Now Strip # Optimized: 3 queries (current aggregate + previous aggregate + running count) # instead of ~15 individual count/sum/average queries. # # @param range [String] Time range: "today", "7d", "30d", or "90d" # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Returns Now Strip data for a custom date range # Optimized: 3 queries instead of ~15. # # Compares the selected range against the same-length window # immediately preceding it. # # @param from [Date] Start date (inclusive) # @param to [Date] End date (inclusive) # @return [Hash] Now strip metrics with period-over-period comparisons def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end # Calculates percentage change between old and new values # # @param old_val [Numeric, nil] Previous period value # @param new_val [Numeric] Current period value # @return [Float, nil] Percentage change or nil if old_val is nil/zero def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end # Calculates success rate for a given scope # # @param scope [ActiveRecord::Relation] The execution scope # @return [Float] Success rate as percentage def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end # Returns aggregate stats for a scope in a single query using conditional aggregation # # Replaces ~9 individual count/sum/average queries with one SQL query. # # @param scope [ActiveRecord::Relation] Time-filtered scope # @return [Hash] Aggregated metrics def self.aggregate_period_stats(scope) total, success, errors, timeouts, cost, avg_dur, tokens = scope.pick( Arel.sql("COUNT(*)"), Arel.sql("SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)"), Arel.sql("SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END)"), Arel.sql("COALESCE(SUM(total_cost), 0)"), Arel.sql("AVG(duration_ms)"), Arel.sql("COALESCE(SUM(total_tokens), 0)") ) total = total.to_i success = success.to_i { total: total, success: success, errors: errors.to_i, timeouts: timeouts.to_i, cost: cost.to_f, avg_duration_ms: avg_dur.to_i, tokens: tokens.to_i, success_rate: (total > 0) ? (success.to_f / total * 100).round(1) : 0.0 } end private_class_method :aggregate_period_stats private # Calculates and sets total_tokens from input and output # # @return [Integer] The calculated total def calculate_total_tokens self.total_tokens = (input_tokens || 0) + (output_tokens || 0) end # Calculates and sets total_cost from input and output costs # # @return [BigDecimal] The calculated total def calculate_total_cost self.total_cost = (input_cost || 0) + (output_cost || 0) end # Resolves model info for cost calculation # # Uses Models.find (local registry lookup) rather than Models.resolve # because cost calculation only needs pricing data, not a provider instance. # Models.resolve requires API keys to instantiate the provider, which may # not be available in background jobs or instrumentation contexts. # # @param lookup_model_id [String, nil] The model identifier (defaults to self.model_id) # @return [Object, nil] Model info or nil def resolve_model_info(lookup_model_id = nil) lookup_model_id ||= model_id return nil unless lookup_model_id RubyLLM::Models.find(lookup_model_id) rescue => e Rails.logger.debug("[RubyLLM::Agents] Model lookup failed for #{lookup_model_id}: #{e.}") if defined?(Rails) && Rails.logger nil end end |
Class Method Details
.calculate_period_success_rate(scope) ⇒ Float
Calculates success rate for a given scope
428 429 430 431 432 433 |
# File 'app/models/ruby_llm/agents/execution.rb', line 428 def self.calculate_period_success_rate(scope) total = scope.count return 0.0 if total.zero? (scope.successful.count.to_f / total * 100).round(1) end |
.now_strip_data(range: "today") ⇒ Hash
Returns real-time dashboard data for the Now Strip Optimized: 3 queries (current aggregate + previous aggregate + running count) instead of ~15 individual count/sum/average queries.
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
# File 'app/models/ruby_llm/agents/execution.rb', line 337 def self.now_strip_data(range: "today") current_scope = case range when "7d" then last_n_days(7) when "30d" then last_n_days(30) when "90d" then last_n_days(90) else today end previous_scope = case range when "7d" then where(created_at: 14.days.ago.beginning_of_day..7.days.ago.beginning_of_day) when "30d" then where(created_at: 60.days.ago.beginning_of_day..30.days.ago.beginning_of_day) when "90d" then where(created_at: 180.days.ago.beginning_of_day..90.days.ago.beginning_of_day) else yesterday end curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end |
.now_strip_data_for_dates(from:, to:) ⇒ Hash
Returns Now Strip data for a custom date range Optimized: 3 queries instead of ~15.
Compares the selected range against the same-length window immediately preceding it.
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 |
# File 'app/models/ruby_llm/agents/execution.rb', line 384 def self.now_strip_data_for_dates(from:, to:) span_days = (to - from).to_i + 1 current_scope = where(created_at: from.beginning_of_day..to.end_of_day) previous_from = from - span_days.days previous_to = from - 1.day previous_scope = where(created_at: previous_from.beginning_of_day..previous_to.end_of_day) curr = aggregate_period_stats(current_scope) prev = aggregate_period_stats(previous_scope) { running: running.count, success_today: curr[:success], errors_today: curr[:errors], timeouts_today: curr[:timeouts], cost_today: curr[:cost], executions_today: curr[:total], success_rate: curr[:success_rate], avg_duration_ms: curr[:avg_duration_ms], total_tokens: curr[:tokens], comparisons: { success_change: pct_change(prev[:success], curr[:success]), errors_change: pct_change(prev[:errors], curr[:errors]), cost_change: pct_change(prev[:cost], curr[:cost]), duration_change: pct_change(prev[:avg_duration_ms], curr[:avg_duration_ms]), tokens_change: pct_change(prev[:tokens], curr[:tokens]) } } end |
.pct_change(old_val, new_val) ⇒ Float?
Calculates percentage change between old and new values
419 420 421 422 |
# File 'app/models/ruby_llm/agents/execution.rb', line 419 def self.pct_change(old_val, new_val) return nil if old_val.nil? || old_val.zero? ((new_val - old_val).to_f / old_val * 100).round(1) end |
Instance Method Details
#aggregate_attempt_costs! ⇒ void
This method returns an undefined value.
Aggregates costs from all attempts using each attempt’s model pricing
Used for multi-attempt executions (retries/fallbacks) where different models may have been used. Calculates total cost by summing individual attempt costs.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'app/models/ruby_llm/agents/execution.rb', line 118 def aggregate_attempt_costs! return if attempts.blank? total_input_cost = 0 total_output_cost = 0 attempts.each do |attempt| # Skip short-circuited attempts (no actual API call made) next if attempt["short_circuited"] model_info = resolve_model_info(attempt["model_id"]) next unless model_info&.pricing input_price = model_info.pricing.text_tokens&.input || 0 output_price = model_info.pricing.text_tokens&.output || 0 input_tokens = attempt["input_tokens"] || 0 output_tokens = attempt["output_tokens"] || 0 total_input_cost += (input_tokens / 1_000_000.0) * input_price total_output_cost += (output_tokens / 1_000_000.0) * output_price end self.input_cost = total_input_cost.round(6) self.output_cost = total_output_cost.round(6) end |
#cached? ⇒ Boolean
Returns whether this execution was a cache hit
219 220 221 |
# File 'app/models/ruby_llm/agents/execution.rb', line 219 def cached? cache_hit == true end |
#child? ⇒ Boolean
Returns whether this is a child (nested) execution
203 204 205 |
# File 'app/models/ruby_llm/agents/execution.rb', line 203 def child? parent_execution_id.present? end |
#content_filtered? ⇒ Boolean
Returns whether the response was blocked by content filter
319 320 321 |
# File 'app/models/ruby_llm/agents/execution.rb', line 319 def content_filtered? finish_reason == "content_filter" end |
#depth ⇒ Integer
Returns the execution tree depth
210 211 212 213 214 |
# File 'app/models/ruby_llm/agents/execution.rb', line 210 def depth return 0 if root? parent_execution&.depth.to_i + 1 end |
#failed_attempts ⇒ Array<Hash>
Returns failed attempts
178 179 180 181 182 |
# File 'app/models/ruby_llm/agents/execution.rb', line 178 def failed_attempts return [] if attempts.blank? attempts.select { |a| a["error_class"].present? } end |
#has_retries? ⇒ Boolean
Returns whether this execution had multiple attempts
148 149 150 151 152 153 154 155 |
# File 'app/models/ruby_llm/agents/execution.rb', line 148 def has_retries? count = if self.class.column_names.include?("attempts_count") attempts_count elsif self.class.column_names.include?("attempts") attempts&.size end (count || 0) > 1 end |
#rate_limited ⇒ Object
287 288 289 |
# File 'app/models/ruby_llm/agents/execution.rb', line 287 def rate_limited &.dig("rate_limited") end |
#rate_limited=(val) ⇒ Object
291 292 293 |
# File 'app/models/ruby_llm/agents/execution.rb', line 291 def rate_limited=(val) self. = ( || {}).merge("rate_limited" => val) end |
#rate_limited? ⇒ Boolean
Returns whether this execution was rate limited
226 227 228 |
# File 'app/models/ruby_llm/agents/execution.rb', line 226 def rate_limited? &.dig("rate_limited") == true end |
#response_hash ⇒ Hash
Returns the response payload as a Hash, regardless of how agents wrote it.
The ‘execution_details.response` column is declared as JSON, but agents may write plain strings (chat text), arrays (embeddings), or nil. Views that want to look up specific keys (audio_url, image_url, etc.) need a Hash they can safely `.dig` into. This reader returns an empty hash when the stored response isn’t a Hash, so callers don’t need type guards.
239 240 241 242 |
# File 'app/models/ruby_llm/agents/execution.rb', line 239 def response_hash raw = response raw.is_a?(Hash) ? raw : {} end |
#retryable ⇒ Object
279 280 281 |
# File 'app/models/ruby_llm/agents/execution.rb', line 279 def retryable &.dig("retryable") end |
#retryable=(val) ⇒ Object
283 284 285 |
# File 'app/models/ruby_llm/agents/execution.rb', line 283 def retryable=(val) self. = ( || {}).merge("retryable" => val) end |
#root? ⇒ Boolean
Returns whether this is a root (top-level) execution
196 197 198 |
# File 'app/models/ruby_llm/agents/execution.rb', line 196 def root? parent_execution_id.nil? end |
#short_circuited_attempts ⇒ Array<Hash>
Returns short-circuited attempts (circuit breaker blocked)
187 188 189 190 191 |
# File 'app/models/ruby_llm/agents/execution.rb', line 187 def short_circuited_attempts return [] if attempts.blank? attempts.select { |a| a["short_circuited"] } end |
#soft_purged? ⇒ Boolean
Returns whether this execution has had its detail payload soft-purged.
Soft-purged executions retain all analytics columns (cost, tokens, timing, status) but the large payloads (prompts, responses, tool calls, attempts) stored in execution_details and tool_executions have been destroyed by the retention job.
252 253 254 |
# File 'app/models/ruby_llm/agents/execution.rb', line 252 def soft_purged? &.key?("soft_purged_at") == true end |
#soft_purged_at ⇒ Time?
Returns when this execution was soft-purged, if ever.
259 260 261 262 263 264 265 266 |
# File 'app/models/ruby_llm/agents/execution.rb', line 259 def soft_purged_at raw = &.dig("soft_purged_at") return nil if raw.blank? Time.iso8601(raw) rescue ArgumentError nil end |
#streaming? ⇒ Boolean
Returns whether this execution used streaming
305 306 307 |
# File 'app/models/ruby_llm/agents/execution.rb', line 305 def streaming? streaming == true end |
#successful_attempt ⇒ Hash?
Returns the successful attempt data (if any)
169 170 171 172 173 |
# File 'app/models/ruby_llm/agents/execution.rb', line 169 def successful_attempt return nil if attempts.blank? attempts.find { |a| a["error_class"].nil? && !a["short_circuited"] } end |
#tenant_record ⇒ Object
Convenience method to access tenant_record through the tenant
296 297 298 299 300 |
# File 'app/models/ruby_llm/agents/execution.rb', line 296 def tenant_record return nil unless tenant_id.present? Tenant.find_by(tenant_id: tenant_id)&.tenant_record end |
#tool_calls? ⇒ Boolean Also known as: has_tool_calls?
Returns whether this execution made tool calls
326 327 328 |
# File 'app/models/ruby_llm/agents/execution.rb', line 326 def tool_calls? tool_calls_count.to_i > 0 end |
#truncated? ⇒ Boolean
Returns whether the response was truncated due to max_tokens
312 313 314 |
# File 'app/models/ruby_llm/agents/execution.rb', line 312 def truncated? finish_reason == "length" end |
#used_fallback? ⇒ Boolean
Returns whether this execution used fallback models
160 161 162 163 164 |
# File 'app/models/ruby_llm/agents/execution.rb', line 160 def used_fallback? return false unless self.class.column_names.include?("chosen_model_id") chosen_model_id.present? && chosen_model_id != model_id end |