Module: ActiveJob::Temporal::Observability

Defined in:
lib/activejob/temporal/observability.rb,
lib/activejob/temporal/observability/datadog.rb,
lib/activejob/temporal/observability/prometheus.rb,
lib/activejob/temporal/observability/opentelemetry.rb

Defined Under Namespace

Modules: PrometheusErrorLabels Classes: Adapter, Configuration, Datadog, Error, MetricsServerConfiguration, MissingDependency, OpenTelemetry, Prometheus, UnknownAdapter

Constant Summary collapse

EVENT_NAMESPACE =
"activejob_temporal"
TRACE_CONTEXT_KEY =
:observability

Class Method Summary collapse

Class Method Details

.adapter_class(name) ⇒ Object



22
23
24
25
26
# File 'lib/activejob/temporal/observability.rb', line 22

def adapter_class(name)
  adapter_registry.fetch(name.to_sym) do
    raise UnknownAdapter, "Unknown observability adapter: #{name.inspect}"
  end
end

.attributes_from_job(job, **attributes) ⇒ Object



67
68
69
70
71
72
73
74
75
# File 'lib/activejob/temporal/observability.rb', line 67

def attributes_from_job(job, **attributes)
  normalize_payload(
    {
      job_class: job.class.name,
      job_id: job.job_id,
      queue: job.queue_name
    }.merge(attributes)
  )
end

.attributes_from_payload(payload, **attributes) ⇒ Object



77
78
79
80
81
82
83
84
85
86
# File 'lib/activejob/temporal/observability.rb', line 77

def attributes_from_payload(payload, **attributes)
  normalize_payload(
    {
      job_class: payload_value(payload, :job_class),
      job_id: payload_value(payload, :job_id),
      queue: payload_value(payload, :queue_name) || payload_value(payload, :queue),
      task_queue: payload_value(payload, :activity_task_queue)
    }.merge(activity_context_attributes).merge(attributes)
  )
end

.configurationObject



97
98
99
# File 'lib/activejob/temporal/observability.rb', line 97

def configuration
  ActiveJob::Temporal.config.observability
end

.emit(name, payload = {}) ⇒ Object



28
29
30
31
32
33
# File 'lib/activejob/temporal/observability.rb', line 28

def emit(name, payload = {})
  event_payload = normalize_payload(payload)
  ActiveSupport::Notifications.instrument(event_name(name), event_payload)
  active_adapters.each { |adapter| adapter.record(name.to_sym, event_payload) }
  nil
end

.event_name(name) ⇒ Object



101
102
103
104
105
106
# File 'lib/activejob/temporal/observability.rb', line 101

def event_name(name)
  event = name.to_s
  return event if event.end_with?(".#{EVENT_NAMESPACE}")

  "#{event}.#{EVENT_NAMESPACE}"
end

.inject_trace_context(payload, attributes = {}) ⇒ Object



52
53
54
55
56
57
58
59
60
# File 'lib/activejob/temporal/observability.rb', line 52

def inject_trace_context(payload, attributes = {})
  trace_context = trace_context_for_enqueue(attributes)
  return payload if trace_context.empty?

  observability = payload[:observability] || payload["observability"] || {}
  observability = observability.merge("trace_context" => trace_context)
  payload[:observability] = observability
  payload
end

.instrument(name, payload = {}) ⇒ Object



35
36
37
38
39
40
41
# File 'lib/activejob/temporal/observability.rb', line 35

def instrument(name, payload = {}, &)
  event_payload = normalize_payload(payload)

  ActiveSupport::Notifications.instrument(event_name(name), event_payload) do
    instrument_adapters(name.to_sym, event_payload, &)
  end
end

.register_adapter(name, adapter_class) ⇒ Object



18
19
20
# File 'lib/activejob/temporal/observability.rb', line 18

def register_adapter(name, adapter_class)
  adapter_registry[name.to_sym] = adapter_class
end

.reset!Object



93
94
95
# File 'lib/activejob/temporal/observability.rb', line 93

def reset!
  configuration.reset!
end

.retry_attempt?Boolean

Returns:

  • (Boolean)


88
89
90
91
# File 'lib/activejob/temporal/observability.rb', line 88

def retry_attempt?
  attempt = activity_context_attributes[:attempt]
  attempt && attempt.to_i > 1
end

.trace_context_for_enqueue(payload = {}) ⇒ Object



43
44
45
46
47
48
49
50
# File 'lib/activejob/temporal/observability.rb', line 43

def trace_context_for_enqueue(payload = {})
  active_adapters.each_with_object({}) do |adapter, context|
    next unless adapter.respond_to?(:trace_context_for_enqueue)

    adapter_context = adapter.trace_context_for_enqueue(payload)
    context[adapter.name.to_s] = adapter_context if adapter_context && !adapter_context.empty?
  end
end

.trace_context_from_payload(payload) ⇒ Object



62
63
64
65
# File 'lib/activejob/temporal/observability.rb', line 62

def trace_context_from_payload(payload)
  observability = payload[:observability] || payload["observability"] || {}
  observability[:trace_context] || observability["trace_context"] || {}
end