Module: Deimos::KafkaListener

Defined in:
lib/deimos/instrumentation.rb

Overview

This module listens to events published by RubyKafka.

Class Method Summary collapse

Class Method Details

.send_produce_error(event) ⇒ void

This method returns an undefined value.

Listens for any exceptions that happen during publishing and re-publishes as a Deimos event.

Parameters:

  • event (ActiveSupport::Notifications::Event)


50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/deimos/instrumentation.rb', line 50

def self.send_produce_error(event)
  exception = event.payload[:exception_object]
  Rails.logger.info("Exception: #{exception}")
  return unless exception

  if exception.respond_to?(:failed_messages)
    messages = exception.failed_messages
    messages.group_by(&:topic).each do |topic, batch|
      producer = Deimos::Producer.descendants.find { |c| c.topic == topic }
      next if batch.empty? || !producer

      decoder = Deimos.schema_backend(schema: producer.config[:schema],
                                      namespace: producer.config[:namespace])
      payloads = batch.map { |m| decoder.decode(m.value) }

      Deimos.config.metrics&.increment(
        'publish_error',
        tags: %W(topic:#{topic}),
        by: payloads.size
      )
      Deimos.instrument(
        'produce_error',
        producer: producer,
        topic: topic,
        exception_object: exception,
        payloads: payloads
      )
  end
  else
    Rails.logger.info(Deimos.config.metrics)
    Deimos.config.metrics&.increment(
      'publish_error',
      by: event.payload[:message_count] || 1
    )
  end
end