Class: ActiveAgent::ProcessTelemetryTracesJob

Inherits:
ActiveJob::Base
  • Object
show all
Defined in:
lib/active_agent/dashboard/app/jobs/active_agent/process_telemetry_traces_job.rb

Overview

Processes telemetry traces received from ActiveAgent clients.

This job handles the asynchronous processing of trace data to avoid blocking the ingestion endpoint. It:

  • Creates TelemetryTrace records for each trace

  • Updates aggregate statistics

  • Handles any errors gracefully

Examples:

Local mode

ActiveAgent::ProcessTelemetryTracesJob.perform_later(
  traces: [...],
  sdk_info: { name: "activeagent", version: "0.5.0" },
  received_at: "2024-01-15T10:30:00Z"
)

Multi-tenant mode

ActiveAgent::ProcessTelemetryTracesJob.perform_later(
  account_id: 1,
  traces: [...],
  sdk_info: { name: "activeagent", version: "0.5.0" },
  received_at: "2024-01-15T10:30:00Z"
)

Constant Summary collapse

MAX_TRACES_PER_JOB =

Maximum traces to process in a single job to avoid memory issues

100

Instance Method Summary collapse

Instance Method Details

#perform(account_id: nil, traces:, sdk_info:, received_at:) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/active_agent/dashboard/app/jobs/active_agent/process_telemetry_traces_job.rb', line 33

def perform(account_id: nil, traces:, sdk_info:, received_at:)
   = ()

  # In multi-tenant mode, require an account
  if ActiveAgent::Dashboard.multi_tenant? && .nil?
    Rails.logger.warn("[ProcessTelemetryTracesJob] Skipping traces - no valid account")
    return
  end

  traces = traces.take(MAX_TRACES_PER_JOB)

  traces.each do |trace|
    process_trace(trace, sdk_info, )
  rescue StandardError => e
    Rails.logger.error(
      "[ProcessTelemetryTracesJob] Failed to process trace #{trace['trace_id']}: " \
      "#{e.class} - #{e.message}"
    )
  end
end