Class: Sbmt::KafkaConsumer::Instrumentation::OpenTelemetryTracer
- Inherits:
-
Tracer
- Object
- Tracer
- Sbmt::KafkaConsumer::Instrumentation::OpenTelemetryTracer
show all
- Defined in:
- lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb
Constant Summary
collapse
- CONSUMED_EVENTS =
%w[
consumer.process_message
consumer.mark_as_consumed
].freeze
Class Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from Tracer
#initialize
Class Attribute Details
.enabled=(value) ⇒ Object
Sets the attribute enabled
21
22
23
|
# File 'lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb', line 21
def enabled=(value)
@enabled = value
end
|
Class Method Details
.enabled? ⇒ Boolean
17
18
19
|
# File 'lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb', line 17
def enabled?
!!@enabled
end
|
Instance Method Details
#handle_common_event(&block) ⇒ Object
86
87
88
89
90
91
92
93
94
|
# File 'lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb', line 86
def handle_common_event(&block)
return yield unless enabled?
if @payload[:inbox_name].present?
handle_inbox_consumed_one(&block)
else
handle_consumed_one(&block)
end
end
|
#handle_consumed_batch ⇒ Object
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
# File 'lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb', line 51
def handle_consumed_batch
return yield unless enabled?
consumer = @payload[:caller]
messages = @payload[:messages]
links = messages.filter_map do |m|
parent_context = ::OpenTelemetry.propagation.(m., getter: ::OpenTelemetry::Context::Propagation.text_map_getter)
span_context = ::OpenTelemetry::Trace.current_span(parent_context).context
::OpenTelemetry::Trace::Link.new(span_context) if span_context.valid?
end
tracer.in_span("consume batch", links: links, attributes: batch_attrs(consumer, messages), kind: :consumer) do
yield
end
end
|
#handle_consumed_one ⇒ Object
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
# File 'lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb', line 34
def handle_consumed_one
return yield unless enabled?
consumer = @payload[:caller]
message = @payload[:message]
parent_context = ::OpenTelemetry.propagation.(message., getter: ::OpenTelemetry::Context::Propagation.text_map_getter)
span_context = ::OpenTelemetry::Trace.current_span(parent_context).context
links = [::OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid?
::OpenTelemetry::Context.with_current(parent_context) do
tracer.in_span("consume #{message.topic}", links: links, attributes: consumer_attrs(consumer, message), kind: :consumer) do
yield
end
end
end
|
#handle_error ⇒ Object
96
97
98
99
100
101
102
103
|
# File 'lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb', line 96
def handle_error
return yield unless enabled?
current_span = OpenTelemetry::Trace.current_span
current_span&.status = OpenTelemetry::Trace::Status.error
yield
end
|
#handle_inbox_consumed_one ⇒ Object
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
# File 'lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb', line 68
def handle_inbox_consumed_one
return yield unless enabled?
inbox_name = @payload[:inbox_name]
event_name = @payload[:event_name]
status = @payload[:status]
inbox_attributes = {
"inbox.inbox_name" => inbox_name,
"inbox.event_name" => event_name,
"inbox.status" => status
}.compact
tracer.in_span("inbox #{inbox_name} process", attributes: inbox_attributes, kind: :consumer) do
yield
end
end
|
#trace(&block) ⇒ Object
24
25
26
27
28
29
30
31
32
|
# File 'lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb', line 24
def trace(&block)
return handle_consumed_one(&block) if @event_id == "consumer.consumed_one"
return handle_consumed_batch(&block) if @event_id == "consumer.consumed_batch"
return handle_inbox_consumed_one(&block) if @event_id == "consumer.inbox.consumed_one"
return handle_common_event(&block) if CONSUMED_EVENTS.include?(@event_id)
return handle_error(&block) if @event_id == "error.occurred"
yield
end
|