Class: Conductor::Worker::Telemetry::MetricsCollector
- Inherits:
-
Object
- Object
- Conductor::Worker::Telemetry::MetricsCollector
- Includes:
- Events::HttpEventsListener, Events::TaskRunnerEventsListener, Events::WorkflowEventsListener
- Defined in:
- lib/conductor/worker/telemetry/metrics_collector.rb
Overview
MetricsCollector - Canonical SDK worker metrics from the harmonization spec (sdk-metrics-harmonization.md).
Uses camelCase domain labels (taskType, workflowType) and includes status labels on time histograms.
Constant Summary collapse
- STATUS_SUCCESS =
'SUCCESS'- STATUS_FAILURE =
'FAILURE'
Instance Attribute Summary collapse
-
#backend ⇒ Object
readonly
Returns the value of attribute backend.
-
#measure_payload_size ⇒ Object
readonly
Returns the value of attribute measure_payload_size.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(backend: :null, subscribe_global_http: true, measure_payload_size: true, logger: nil) ⇒ MetricsCollector
constructor
A new instance of MetricsCollector.
- #on_active_workers_changed(event) ⇒ Object
-
#on_http_api_request(event) ⇒ Object
--- HTTP Event Handlers ---.
- #on_poll_completed(event) ⇒ Object
- #on_poll_failure(event) ⇒ Object
-
#on_poll_started(event) ⇒ Object
--- Task Runner Event Handlers ---.
- #on_task_execution_completed(event) ⇒ Object
- #on_task_execution_failure(event) ⇒ Object
- #on_task_execution_started(event) ⇒ Object
- #on_task_paused(event) ⇒ Object
- #on_task_update_completed(event) ⇒ Object
- #on_task_update_failure(event) ⇒ Object
- #on_thread_uncaught_exception(event) ⇒ Object
- #on_workflow_input_size(event) ⇒ Object
-
#on_workflow_start_error(event) ⇒ Object
--- Workflow Event Handlers ---.
- #stop ⇒ Object
Constructor Details
#initialize(backend: :null, subscribe_global_http: true, measure_payload_size: true, logger: nil) ⇒ MetricsCollector
Returns a new instance of MetricsCollector.
36 37 38 39 40 41 42 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 36 def initialize(backend: :null, subscribe_global_http: true, measure_payload_size: true, logger: nil) @backend = load_backend(backend) @logger = logger || Logger.new(File::NULL) @measure_payload_size = measure_payload_size @http_listener = nil subscribe_to_global_http_events if subscribe_global_http end |
Instance Attribute Details
#backend ⇒ Object (readonly)
Returns the value of attribute backend.
44 45 46 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 44 def backend @backend end |
#measure_payload_size ⇒ Object (readonly)
Returns the value of attribute measure_payload_size.
44 45 46 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 44 def measure_payload_size @measure_payload_size end |
Class Method Details
.create(backend: :null, subscribe_global_http: true, measure_payload_size: true, logger: nil) ⇒ MetricsCollector
31 32 33 34 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 31 def self.create(backend: :null, subscribe_global_http: true, measure_payload_size: true, logger: nil) new(backend: backend, subscribe_global_http: subscribe_global_http, measure_payload_size: measure_payload_size, logger: logger) end |
Instance Method Details
#on_active_workers_changed(event) ⇒ Object
118 119 120 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 118 def on_active_workers_changed(event) @backend.set('active_workers', event.count, labels: { taskType: event.task_type }) end |
#on_http_api_request(event) ⇒ Object
--- HTTP Event Handlers ---
140 141 142 143 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 140 def on_http_api_request(event) observe_time('http_api_client_request_seconds', event.duration_ms, { method: event.method, uri: event.uri, status: event.status }) end |
#on_poll_completed(event) ⇒ Object
61 62 63 64 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 61 def on_poll_completed(event) observe_time('task_poll_time_seconds', event.duration_ms, { taskType: event.task_type, status: STATUS_SUCCESS }) end |
#on_poll_failure(event) ⇒ Object
66 67 68 69 70 71 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 66 def on_poll_failure(event) @backend.increment('task_poll_error_total', labels: { taskType: event.task_type, exception: event.cause.class.name }) observe_time('task_poll_time_seconds', event.duration_ms, { taskType: event.task_type, status: STATUS_FAILURE }) end |
#on_poll_started(event) ⇒ Object
--- Task Runner Event Handlers ---
57 58 59 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 57 def on_poll_started(event) @backend.increment('task_poll_total', labels: { taskType: event.task_type }) end |
#on_task_execution_completed(event) ⇒ Object
77 78 79 80 81 82 83 84 85 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 77 def on_task_execution_completed(event) observe_time('task_execute_time_seconds', event.duration_ms, { taskType: event.task_type, status: STATUS_SUCCESS }) return unless event.output_size_bytes @backend.observe('task_result_size_bytes', event.output_size_bytes, labels: { taskType: event.task_type }) end |
#on_task_execution_failure(event) ⇒ Object
87 88 89 90 91 92 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 87 def on_task_execution_failure(event) @backend.increment('task_execute_error_total', labels: { taskType: event.task_type, exception: event.cause.class.name }) observe_time('task_execute_time_seconds', event.duration_ms, { taskType: event.task_type, status: STATUS_FAILURE }) end |
#on_task_execution_started(event) ⇒ Object
73 74 75 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 73 def on_task_execution_started(event) @backend.increment('task_execution_started_total', labels: { taskType: event.task_type }) end |
#on_task_paused(event) ⇒ Object
109 110 111 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 109 def on_task_paused(event) @backend.increment('task_paused_total', labels: { taskType: event.task_type }) end |
#on_task_update_completed(event) ⇒ Object
94 95 96 97 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 94 def on_task_update_completed(event) observe_time('task_update_time_seconds', event.duration_ms, { taskType: event.task_type, status: STATUS_SUCCESS }) end |
#on_task_update_failure(event) ⇒ Object
99 100 101 102 103 104 105 106 107 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 99 def on_task_update_failure(event) @backend.increment('task_update_error_total', labels: { taskType: event.task_type, exception: event.cause.class.name }) return unless event.respond_to?(:duration_ms) && event.duration_ms observe_time('task_update_time_seconds', event.duration_ms, { taskType: event.task_type, status: STATUS_FAILURE }) end |
#on_thread_uncaught_exception(event) ⇒ Object
113 114 115 116 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 113 def on_thread_uncaught_exception(event) @backend.increment('thread_uncaught_exceptions_total', labels: { exception: event.cause.class.name }) end |
#on_workflow_input_size(event) ⇒ Object
130 131 132 133 134 135 136 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 130 def on_workflow_input_size(event) return unless @measure_payload_size @backend.observe('workflow_input_size_bytes', event.size_bytes, labels: { workflowType: event.workflow_type, version: (event.version || '').to_s }) end |
#on_workflow_start_error(event) ⇒ Object
--- Workflow Event Handlers ---
124 125 126 127 128 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 124 def on_workflow_start_error(event) @backend.increment('workflow_start_error_total', labels: { workflowType: event.workflow_type, exception: event.cause.class.name }) end |
#stop ⇒ Object
46 47 48 49 50 51 52 53 |
# File 'lib/conductor/worker/telemetry/metrics_collector.rb', line 46 def stop return unless @http_listener Events::GlobalDispatcher.instance.unregister(Events::HttpApiRequest, @http_listener) @http_listener = nil rescue StandardError => e @logger.debug { "Telemetry error (non-fatal): #{e.class}: #{e.}" } end |