Module: Deimos::KafkaListener
- Defined in:
- lib/deimos/instrumentation.rb
Overview
This module listens to events published by RubyKafka.
Class Method Summary collapse
-
.send_produce_error(event) ⇒ Object
Listens for any exceptions that happen during publishing and re-publishes as a Deimos event.
Class Method Details
.send_produce_error(event) ⇒ Object
Listens for any exceptions that happen during publishing and re-publishes as a Deimos event.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/deimos/instrumentation.rb', line 43 def self.send_produce_error(event) exception = event.payload[:exception_object] return if !exception || !exception.respond_to?(:failed_messages) = exception. .group_by(&:topic).each do |topic, batch| producer = Deimos::Producer.descendants.find { |c| c.topic == topic } next if batch.empty? || !producer decoder = Deimos.schema_backend(schema: producer.config[:schema], namespace: producer.config[:namespace]) payloads = batch.map { |m| decoder.decode(m.value) } Deimos.config.metrics&.increment( 'publish_error', tags: %W(topic:#{topic}), by: payloads.size ) Deimos.instrument( 'produce_error', producer: producer, topic: topic, exception_object: exception, payloads: payloads ) end end |