Module: ActiveJob::Temporal::Observability
- Defined in:
- lib/activejob/temporal/observability.rb,
lib/activejob/temporal/observability/datadog.rb,
lib/activejob/temporal/observability/prometheus.rb,
lib/activejob/temporal/observability/opentelemetry.rb
Defined Under Namespace
Modules: PrometheusErrorLabels
Classes: Adapter, Configuration, Datadog, Error, MetricsServerConfiguration, MissingDependency, OpenTelemetry, Prometheus, UnknownAdapter
Constant Summary
collapse
- EVENT_NAMESPACE =
"activejob_temporal"
- TRACE_CONTEXT_KEY =
:observability
Class Method Summary
collapse
Class Method Details
.adapter_class(name) ⇒ Object
22
23
24
25
26
|
# File 'lib/activejob/temporal/observability.rb', line 22
def adapter_class(name)
adapter_registry.fetch(name.to_sym) do
raise UnknownAdapter, "Unknown observability adapter: #{name.inspect}"
end
end
|
.attributes_from_job(job, **attributes) ⇒ Object
67
68
69
70
71
72
73
74
75
|
# File 'lib/activejob/temporal/observability.rb', line 67
def attributes_from_job(job, **attributes)
normalize_payload(
{
job_class: job.class.name,
job_id: job.job_id,
queue: job.queue_name
}.merge(attributes)
)
end
|
.attributes_from_payload(payload, **attributes) ⇒ Object
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/activejob/temporal/observability.rb', line 77
def attributes_from_payload(payload, **attributes)
normalize_payload(
{
job_class: payload_value(payload, :job_class),
job_id: payload_value(payload, :job_id),
queue: payload_value(payload, :queue_name) || payload_value(payload, :queue),
task_queue: payload_value(payload, :activity_task_queue)
}.merge(activity_context_attributes).merge(attributes)
)
end
|
.configuration ⇒ Object
97
98
99
|
# File 'lib/activejob/temporal/observability.rb', line 97
def configuration
ActiveJob::Temporal.config.observability
end
|
.emit(name, payload = {}) ⇒ Object
28
29
30
31
32
33
|
# File 'lib/activejob/temporal/observability.rb', line 28
def emit(name, payload = {})
event_payload = normalize_payload(payload)
ActiveSupport::Notifications.instrument(event_name(name), event_payload)
active_adapters.each { |adapter| adapter.record(name.to_sym, event_payload) }
nil
end
|
.event_name(name) ⇒ Object
101
102
103
104
105
106
|
# File 'lib/activejob/temporal/observability.rb', line 101
def event_name(name)
event = name.to_s
return event if event.end_with?(".#{EVENT_NAMESPACE}")
"#{event}.#{EVENT_NAMESPACE}"
end
|
.inject_trace_context(payload, attributes = {}) ⇒ Object
52
53
54
55
56
57
58
59
60
|
# File 'lib/activejob/temporal/observability.rb', line 52
def inject_trace_context(payload, attributes = {})
trace_context = trace_context_for_enqueue(attributes)
return payload if trace_context.empty?
observability = payload[:observability] || payload["observability"] || {}
observability = observability.merge("trace_context" => trace_context)
payload[:observability] = observability
payload
end
|
.instrument(name, payload = {}) ⇒ Object
35
36
37
38
39
40
41
|
# File 'lib/activejob/temporal/observability.rb', line 35
def instrument(name, payload = {}, &)
event_payload = normalize_payload(payload)
ActiveSupport::Notifications.instrument(event_name(name), event_payload) do
instrument_adapters(name.to_sym, event_payload, &)
end
end
|
.register_adapter(name, adapter_class) ⇒ Object
18
19
20
|
# File 'lib/activejob/temporal/observability.rb', line 18
def register_adapter(name, adapter_class)
adapter_registry[name.to_sym] = adapter_class
end
|
.reset! ⇒ Object
93
94
95
|
# File 'lib/activejob/temporal/observability.rb', line 93
def reset!
configuration.reset!
end
|
.retry_attempt? ⇒ Boolean
88
89
90
91
|
# File 'lib/activejob/temporal/observability.rb', line 88
def retry_attempt?
attempt = activity_context_attributes[:attempt]
attempt && attempt.to_i > 1
end
|
.trace_context_for_enqueue(payload = {}) ⇒ Object
43
44
45
46
47
48
49
50
|
# File 'lib/activejob/temporal/observability.rb', line 43
def trace_context_for_enqueue(payload = {})
active_adapters.each_with_object({}) do |adapter, context|
next unless adapter.respond_to?(:trace_context_for_enqueue)
adapter_context = adapter.trace_context_for_enqueue(payload)
context[adapter.name.to_s] = adapter_context if adapter_context && !adapter_context.empty?
end
end
|
.trace_context_from_payload(payload) ⇒ Object
62
63
64
65
|
# File 'lib/activejob/temporal/observability.rb', line 62
def trace_context_from_payload(payload)
observability = payload[:observability] || payload["observability"] || {}
observability[:trace_context] || observability["trace_context"] || {}
end
|