Module: WaterDrop::Clients::Rdkafka

Defined in:
lib/waterdrop/clients/rdkafka.rb

Overview

Default Rdkafka client. Since we use the ::Rdkafka::Producer under the hood, this is just a module that aligns with client building API for the convenience.

Class Method Summary collapse

Class Method Details

.new(producer) ⇒ Object

Note:

We overwrite this that way, because we do not care

Parameters:



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/waterdrop/clients/rdkafka.rb', line 13

def new(producer)
  monitor = producer.config.monitor
  kafka_config, statistics_enabled = prepare_statistics(
    producer.config.kafka.to_h,
    monitor
  )

  client = build_rdkafka_client(producer, kafka_config)

  register_instrumentation_callbacks(
    producer,
    client,
    monitor,
    statistics_enabled: statistics_enabled
  )

  # This callback is not global and is per client, thus we do not have to wrap it with a
  # callbacks manager to make it work
  client.delivery_callback = Instrumentation::Callbacks::Delivery.new(
    producer.id,
    producer.transactional?,
    monitor
  )

  subscribe_oauth_listener(producer, monitor)
  activate_client(producer, client, kafka_config)

  client
end