Module: Deimos::KafkaListener

Defined in:
lib/deimos/instrumentation.rb

Overview

This module listens to events published by RubyKafka.

Class Method Summary collapse

Class Method Details

.send_produce_error(event) ⇒ Object

Listens for any exceptions that happen during publishing and re-publishes as a Deimos event.

Parameters:

  • event (ActiveSupport::Notification)


43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/deimos/instrumentation.rb', line 43

def self.send_produce_error(event)
  exception = event.payload[:exception_object]
  return if !exception || !exception.respond_to?(:failed_messages)

  messages = exception.failed_messages
  messages.group_by(&:topic).each do |topic, batch|
    producer = Deimos::Producer.descendants.find { |c| c.topic == topic }
    next if batch.empty? || !producer

    decoder = Deimos.schema_backend(schema: producer.config[:schema],
                                    namespace: producer.config[:namespace])
    payloads = batch.map { |m| decoder.decode(m.value) }

    Deimos.config.metrics&.increment(
      'publish_error',
      tags: %W(topic:#{topic}),
      by: payloads.size
    )
    Deimos.instrument(
      'produce_error',
      producer: producer,
      topic: topic,
      exception_object: exception,
      payloads: payloads
    )
  end
end