Module: WaterDrop::Clients::Rdkafka

Defined in:
lib/waterdrop/clients/rdkafka.rb

Overview

Default Rdkafka client. Since we use the ::Rdkafka::Producer under the hood, this is just a module that aligns with client building API for the convenience.

Class Method Summary collapse

Class Method Details

.new(producer) ⇒ Object

Note:

We overwrite this that way, because we do not care

Parameters:



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/waterdrop/clients/rdkafka.rb', line 13

def new(producer)
  config = producer.config.kafka.to_h

  client = ::Rdkafka::Config.new(config).producer

  # This callback is not global and is per client, thus we do not have to wrap it with a
  # callbacks manager to make it work
  client.delivery_callback = Instrumentation::Callbacks::Delivery.new(
    producer.id,
    producer.transactional?,
    producer.config.monitor
  )

  # Switch to the transactional mode if user provided the transactional id
  client.init_transactions if config.key?(:'transactional.id')

  client
end