Class: JobWorkflow::Instrumentation::OpenTelemetrySubscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/job_workflow/instrumentation/opentelemetry_subscriber.rb

Overview

Note:

This subscriber requires the opentelemetry-api gem to be installed. If not available, subscription will be silently skipped.

OpenTelemetrySubscriber provides OpenTelemetry tracing integration for JobWorkflow events. It subscribes to ActiveSupport::Notifications and creates OpenTelemetry spans.

Examples:

Enable OpenTelemetry integration

```ruby
# Ensure OpenTelemetry is configured first
OpenTelemetry::SDK.configure do |c|
  c.service_name = "my-app"
end

# Then subscribe JobWorkflow events
JobWorkflow::Instrumentation::OpenTelemetrySubscriber.subscribe!
```

Defined Under Namespace

Modules: Attributes

Constant Summary collapse

SUBSCRIBED_EVENTS =
[
  Events::WORKFLOW,
  Events::TASK,
  Events::TASK_SKIP,
  Events::TASK_ENQUEUE,
  Events::TASK_RETRY,
  Events::THROTTLE_ACQUIRE,
  Events::DEPENDENT_WAIT
].freeze

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.opentelemetry_available?Boolean

: () -> bool

Returns:

  • (Boolean)


73
74
75
# File 'lib/job_workflow/instrumentation/opentelemetry_subscriber.rb', line 73

def opentelemetry_available?
  !!defined?(::OpenTelemetry::Trace)
end

.reset!Object

: () -> void



68
69
70
# File 'lib/job_workflow/instrumentation/opentelemetry_subscriber.rb', line 68

def reset!
  unsubscribe!
end

.subscribe!Object

: () -> Array?



52
53
54
55
56
57
# File 'lib/job_workflow/instrumentation/opentelemetry_subscriber.rb', line 52

def subscribe!
  return unless opentelemetry_available?
  return subscriptions unless subscriptions.empty?

  self.subscriptions = SUBSCRIBED_EVENTS.map { |event| ActiveSupport::Notifications.subscribe(event, new) }
end

.unsubscribe!Object

: () -> void



60
61
62
63
64
65
# File 'lib/job_workflow/instrumentation/opentelemetry_subscriber.rb', line 60

def unsubscribe!
  return if subscriptions.empty?

  subscriptions.each { |sub| ActiveSupport::Notifications.unsubscribe(sub) }
  self.subscriptions = []
end

Instance Method Details

#finish(_name, _id, payload) ⇒ Object

: (String, String, Hash[Symbol, untyped]) -> void



90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/job_workflow/instrumentation/opentelemetry_subscriber.rb', line 90

def finish(_name, _id, payload)
  return unless self.class.opentelemetry_available?

  span, token = extract_otel_info(payload)
  return if span.nil? || token.nil?

  handle_exception(payload, span)
rescue StandardError => e
  handle_error(e)
ensure
  finish_span(span, token) if span || token
end

#start(name, _id, payload) ⇒ Object

: (String, String, Hash[Symbol, untyped]) -> void



79
80
81
82
83
84
85
86
87
# File 'lib/job_workflow/instrumentation/opentelemetry_subscriber.rb', line 79

def start(name, _id, payload)
  return unless self.class.opentelemetry_available?

  span = start_span(name, payload)
  token = attach_span_context(span)
  store_span_info(payload, span, token)
rescue StandardError => e
  handle_error(e)
end