Module: WaterDrop::Patches::Rdkafka::Producer

Defined in:
lib/waterdrop/patches/rdkafka/producer.rb

Overview

Rdkafka::Producer patches

Instance Method Summary collapse

Instance Method Details

#initialize(*args) ⇒ Object

Parameters:

  • args (Object)

    arguments accepted by the original rdkafka producer



16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 16

def initialize(*args)
  super

  @_partitions_count_cache = Concurrent::Hash.new do |cache, topic|
     = ::Rdkafka::Metadata.new(inner_kafka, topic).topics&.first

    cache[topic] = [
      now,
       ? [:partition_count] : nil
    ]
  end
end

#inner_kafkaFFI::Pointer

Returns pointer to the raw librdkafka.

Returns:

  • (FFI::Pointer)

    pointer to the raw librdkafka



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 56

def inner_kafka
  unless @_inner_kafka
    version = ::Gem::Version.new(::Rdkafka::VERSION)

    if version < ::Gem::Version.new('0.12.0')
      @_inner_kafka = @native_kafka
    elsif version < ::Gem::Version.new('0.13.0.beta.1')
      @_inner_kafka = @client.native
    else
      @_inner_kafka = @native_kafka.inner
    end
  end

  @_inner_kafka
end

#nameString

Adds a method that allows us to get the native kafka producer name

In between rdkafka versions, there are internal changes that force us to add some extra magic to support all the versions.

Returns:

  • (String)

    producer instance name



35
36
37
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 35

def name
  @_name ||= ::Rdkafka::Bindings.rd_kafka_name(inner_kafka)
end

#nowFloat

Returns current clock time.

Returns:

  • (Float)

    current clock time



73
74
75
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 73

def now
  ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) * 1_000
end

#partition_count(topic) ⇒ Integer

This patch makes sure we cache the partition count for a given topic for given time This prevents us in case someone uses `partition_key` from querying for the count with each message. Instead we query once every 30 seconds at most

Parameters:

  • topic (String)

    topic name

Returns:

  • (Integer)

    partition count for a given topic



45
46
47
48
49
50
51
52
53
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 45

def partition_count(topic)
  closed_producer_check(__method__)

  @_partitions_count_cache.delete_if do |_, cached|
    now - cached.first > PARTITIONS_COUNT_TTL
  end

  @_partitions_count_cache[topic].last
end