Class: Sbmt::KafkaConsumer::Instrumentation::OpenTelemetryTracer

Inherits:
Tracer
  • Object
show all
Defined in:
lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb

Constant Summary collapse

CONSUMED_EVENTS =
%w[
  consumer.process_message
  consumer.mark_as_consumed
].freeze

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Tracer

#initialize

Constructor Details

This class inherits a constructor from Sbmt::KafkaConsumer::Instrumentation::Tracer

Class Attribute Details

.enabled=(value) ⇒ Object (writeonly)

Sets the attribute enabled

Parameters:

  • value

    the value to set the attribute enabled to.



21
22
23
# File 'lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb', line 21

def enabled=(value)
  @enabled = value
end

Class Method Details

.enabled?Boolean

Returns:

  • (Boolean)


17
18
19
# File 'lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb', line 17

def enabled?
  !!@enabled
end

Instance Method Details

#handle_common_event(&block) ⇒ Object



86
87
88
89
90
91
92
93
94
# File 'lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb', line 86

def handle_common_event(&block)
  return yield unless enabled?

  if @payload[:inbox_name].present?
    handle_inbox_consumed_one(&block)
  else
    handle_consumed_one(&block)
  end
end

#handle_consumed_batchObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb', line 51

def handle_consumed_batch
  return yield unless enabled?

  consumer = @payload[:caller]
  messages = @payload[:messages]

  links = messages.filter_map do |m|
    parent_context = ::OpenTelemetry.propagation.extract(m.headers, getter: ::OpenTelemetry::Context::Propagation.text_map_getter)
    span_context = ::OpenTelemetry::Trace.current_span(parent_context).context
    ::OpenTelemetry::Trace::Link.new(span_context) if span_context.valid?
  end

  tracer.in_span("consume batch", links: links, attributes: batch_attrs(consumer, messages), kind: :consumer) do
    yield
  end
end

#handle_consumed_oneObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb', line 34

def handle_consumed_one
  return yield unless enabled?

  consumer = @payload[:caller]
  message = @payload[:message]

  parent_context = ::OpenTelemetry.propagation.extract(message.headers, getter: ::OpenTelemetry::Context::Propagation.text_map_getter)
  span_context = ::OpenTelemetry::Trace.current_span(parent_context).context
  links = [::OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid?

  ::OpenTelemetry::Context.with_current(parent_context) do
    tracer.in_span("consume #{message.topic}", links: links, attributes: consumer_attrs(consumer, message), kind: :consumer) do
      yield
    end
  end
end

#handle_errorObject



96
97
98
99
100
101
102
103
# File 'lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb', line 96

def handle_error
  return yield unless enabled?

  current_span = OpenTelemetry::Trace.current_span
  current_span&.status = OpenTelemetry::Trace::Status.error

  yield
end

#handle_inbox_consumed_oneObject



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb', line 68

def handle_inbox_consumed_one
  return yield unless enabled?

  inbox_name = @payload[:inbox_name]
  event_name = @payload[:event_name]
  status = @payload[:status]

  inbox_attributes = {
    "inbox.inbox_name" => inbox_name,
    "inbox.event_name" => event_name,
    "inbox.status" => status
  }.compact

  tracer.in_span("inbox #{inbox_name} process", attributes: inbox_attributes, kind: :consumer) do
    yield
  end
end

#trace(&block) ⇒ Object



24
25
26
27
28
29
30
31
32
# File 'lib/sbmt/kafka_consumer/instrumentation/open_telemetry_tracer.rb', line 24

def trace(&block)
  return handle_consumed_one(&block) if @event_id == "consumer.consumed_one"
  return handle_consumed_batch(&block) if @event_id == "consumer.consumed_batch"
  return handle_inbox_consumed_one(&block) if @event_id == "consumer.inbox.consumed_one"
  return handle_common_event(&block) if CONSUMED_EVENTS.include?(@event_id)
  return handle_error(&block) if @event_id == "error.occurred"

  yield
end