Class: Conductor::Worker::Telemetry::MetricsCollector

Inherits:
Object
  • Object
show all
Includes:
Events::HttpEventsListener, Events::TaskRunnerEventsListener, Events::WorkflowEventsListener
Defined in:
lib/conductor/worker/telemetry/metrics_collector.rb

Overview

MetricsCollector - Canonical SDK worker metrics from the harmonization spec (sdk-metrics-harmonization.md).

Uses camelCase domain labels (taskType, workflowType) and includes status labels on time histograms.

Constant Summary collapse

STATUS_SUCCESS =
'SUCCESS'
STATUS_FAILURE =
'FAILURE'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(backend: :null, subscribe_global_http: true, measure_payload_size: true, logger: nil) ⇒ MetricsCollector

Returns a new instance of MetricsCollector.



36
37
38
39
40
41
42
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 36

def initialize(backend: :null, subscribe_global_http: true, measure_payload_size: true, logger: nil)
  @backend = load_backend(backend)
  @logger = logger || Logger.new(File::NULL)
  @measure_payload_size = measure_payload_size
  @http_listener = nil
  subscribe_to_global_http_events if subscribe_global_http
end

Instance Attribute Details

#backendObject (readonly)

Returns the value of attribute backend.



44
45
46
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 44

def backend
  @backend
end

#measure_payload_sizeObject (readonly)

Returns the value of attribute measure_payload_size.



44
45
46
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 44

def measure_payload_size
  @measure_payload_size
end

Class Method Details

.create(backend: :null, subscribe_global_http: true, measure_payload_size: true, logger: nil) ⇒ MetricsCollector

Parameters:

  • backend (Symbol, Object) (defaults to: :null)

    :null, :prometheus, or a custom backend

  • subscribe_global_http (Boolean) (defaults to: true)

    Auto-subscribe to GlobalDispatcher for HttpApiRequest events from the HTTP layer (default true).

  • measure_payload_size (Boolean) (defaults to: true)

    Record workflow_input_size_bytes (requires JSON serialization; default true). Set false to skip serialization overhead for large payloads.

  • logger (Logger, nil) (defaults to: nil)

    Optional logger for diagnostic output in rescue blocks

Returns:



31
32
33
34
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 31

def self.create(backend: :null, subscribe_global_http: true, measure_payload_size: true, logger: nil)
  new(backend: backend, subscribe_global_http: subscribe_global_http,
      measure_payload_size: measure_payload_size, logger: logger)
end

Instance Method Details

#on_active_workers_changed(event) ⇒ Object



118
119
120
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 118

def on_active_workers_changed(event)
  @backend.set('active_workers', event.count, labels: { taskType: event.task_type })
end

#on_http_api_request(event) ⇒ Object

--- HTTP Event Handlers ---



140
141
142
143
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 140

def on_http_api_request(event)
  observe_time('http_api_client_request_seconds', event.duration_ms,
               { method: event.method, uri: event.uri, status: event.status })
end

#on_poll_completed(event) ⇒ Object



61
62
63
64
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 61

def on_poll_completed(event)
  observe_time('task_poll_time_seconds', event.duration_ms,
               { taskType: event.task_type, status: STATUS_SUCCESS })
end

#on_poll_failure(event) ⇒ Object



66
67
68
69
70
71
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 66

def on_poll_failure(event)
  @backend.increment('task_poll_error_total',
                     labels: { taskType: event.task_type, exception: event.cause.class.name })
  observe_time('task_poll_time_seconds', event.duration_ms,
               { taskType: event.task_type, status: STATUS_FAILURE })
end

#on_poll_started(event) ⇒ Object

--- Task Runner Event Handlers ---



57
58
59
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 57

def on_poll_started(event)
  @backend.increment('task_poll_total', labels: { taskType: event.task_type })
end

#on_task_execution_completed(event) ⇒ Object



77
78
79
80
81
82
83
84
85
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 77

def on_task_execution_completed(event)
  observe_time('task_execute_time_seconds', event.duration_ms,
               { taskType: event.task_type, status: STATUS_SUCCESS })

  return unless event.output_size_bytes

  @backend.observe('task_result_size_bytes', event.output_size_bytes,
                   labels: { taskType: event.task_type })
end

#on_task_execution_failure(event) ⇒ Object



87
88
89
90
91
92
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 87

def on_task_execution_failure(event)
  @backend.increment('task_execute_error_total',
                     labels: { taskType: event.task_type, exception: event.cause.class.name })
  observe_time('task_execute_time_seconds', event.duration_ms,
               { taskType: event.task_type, status: STATUS_FAILURE })
end

#on_task_execution_started(event) ⇒ Object



73
74
75
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 73

def on_task_execution_started(event)
  @backend.increment('task_execution_started_total', labels: { taskType: event.task_type })
end

#on_task_paused(event) ⇒ Object



109
110
111
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 109

def on_task_paused(event)
  @backend.increment('task_paused_total', labels: { taskType: event.task_type })
end

#on_task_update_completed(event) ⇒ Object



94
95
96
97
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 94

def on_task_update_completed(event)
  observe_time('task_update_time_seconds', event.duration_ms,
               { taskType: event.task_type, status: STATUS_SUCCESS })
end

#on_task_update_failure(event) ⇒ Object



99
100
101
102
103
104
105
106
107
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 99

def on_task_update_failure(event)
  @backend.increment('task_update_error_total',
                     labels: { taskType: event.task_type, exception: event.cause.class.name })

  return unless event.respond_to?(:duration_ms) && event.duration_ms

  observe_time('task_update_time_seconds', event.duration_ms,
               { taskType: event.task_type, status: STATUS_FAILURE })
end

#on_thread_uncaught_exception(event) ⇒ Object



113
114
115
116
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 113

def on_thread_uncaught_exception(event)
  @backend.increment('thread_uncaught_exceptions_total',
                     labels: { exception: event.cause.class.name })
end

#on_workflow_input_size(event) ⇒ Object



130
131
132
133
134
135
136
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 130

def on_workflow_input_size(event)
  return unless @measure_payload_size

  @backend.observe('workflow_input_size_bytes', event.size_bytes,
                   labels: { workflowType: event.workflow_type,
                             version: (event.version || '').to_s })
end

#on_workflow_start_error(event) ⇒ Object

--- Workflow Event Handlers ---



124
125
126
127
128
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 124

def on_workflow_start_error(event)
  @backend.increment('workflow_start_error_total',
                     labels: { workflowType: event.workflow_type,
                               exception: event.cause.class.name })
end

#stopObject



46
47
48
49
50
51
52
53
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 46

def stop
  return unless @http_listener

  Events::GlobalDispatcher.instance.unregister(Events::HttpApiRequest, @http_listener)
  @http_listener = nil
rescue StandardError => e
  @logger.debug { "Telemetry error (non-fatal): #{e.class}: #{e.message}" }
end