Class: RubyLLM::Agents::Pipeline::Middleware::Instrumentation

Inherits:
Base
  • Object
show all
Defined in:
lib/ruby_llm/agents/pipeline/middleware/instrumentation.rb

Overview

Times execution and records results for observability.

This middleware provides:

  • Execution timing (start/end timestamps, duration)

  • Success/failure recording to database

  • Token usage and cost tracking

  • Error details on failure

Recording can be async (via background job) or sync depending on configuration.

Tracking is enabled/disabled per agent type via configuration:

  • track_executions (conversation agents)

  • track_embeddings

  • track_image_generations

  • track_audio

Examples:

Configuration

RubyLLM::Agents.configure do |config|
  config.track_executions = true
  config.track_embeddings = true
  config.async_logging = true  # Use background job
end

Constant Summary

Constants inherited from Base

Base::LOG_TAG

Instance Method Summary collapse

Methods inherited from Base

#initialize

Constructor Details

This class inherits a constructor from RubyLLM::Agents::Pipeline::Middleware::Base

Instance Method Details

#call(context) ⇒ Context

Process instrumentation

Creates a “running” execution record at the start so executions appear on the dashboard immediately, then updates it when complete.

Parameters:

  • context (Context)

    The execution context

Returns:

  • (Context)

    The context with timing info



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/ruby_llm/agents/pipeline/middleware/instrumentation.rb', line 39

def call(context)
  context.started_at = Time.current

  trace(context) do
    # Create "running" record immediately (SYNC - must appear on dashboard)
    execution = create_running_execution(context)
    context.execution_id = execution&.id
    emit_start_notification(context)
    status_update_completed = false
    raised_exception = nil

    begin
      @app.call(context)
      context.completed_at = Time.current

      begin
        complete_execution(execution, context, status: "success")
        status_update_completed = true
      rescue
        # Let ensure block handle via mark_execution_failed!
      end

      emit_complete_notification(context, "success")
    rescue => e
      context.completed_at = Time.current
      context.error = e
      raised_exception = e

      begin
        complete_execution(execution, context, status: determine_error_status(e))
        status_update_completed = true
      rescue
        # Let ensure block handle via mark_execution_failed!
      end

      emit_complete_notification(context, determine_error_status(e))
      raise
    ensure
      # Emergency fallback if update failed
      mark_execution_failed!(execution, error: raised_exception || $!) unless status_update_completed
    end

    context
  end
end