Module: WaterDrop::Patches::Rdkafka::Producer
- Defined in:
- lib/waterdrop/patches/rdkafka/producer.rb
Overview
Rdkafka::Producer patches
Instance Method Summary collapse
- #initialize(*args) ⇒ Object
-
#inner_kafka ⇒ FFI::Pointer
Pointer to the raw librdkafka.
-
#name ⇒ String
Adds a method that allows us to get the native kafka producer name.
-
#now ⇒ Float
Current clock time.
-
#partition_count(topic) ⇒ Integer
This patch makes sure we cache the partition count for a given topic for given time This prevents us in case someone uses `partition_key` from querying for the count with each message.
Instance Method Details
#initialize(*args) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 16 def initialize(*args) super @_partitions_count_cache = Concurrent::Hash.new do |cache, topic| = ::Rdkafka::Metadata.new(inner_kafka, topic).topics&.first cache[topic] = [ now, ? [:partition_count] : nil ] end end |
#inner_kafka ⇒ FFI::Pointer
Returns pointer to the raw librdkafka.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 56 def inner_kafka unless @_inner_kafka version = ::Gem::Version.new(::Rdkafka::VERSION) if version < ::Gem::Version.new('0.12.0') @_inner_kafka = @native_kafka elsif version < ::Gem::Version.new('0.13.0.beta.1') @_inner_kafka = @client.native else @_inner_kafka = @native_kafka.inner end end @_inner_kafka end |
#name ⇒ String
Adds a method that allows us to get the native kafka producer name
In between rdkafka versions, there are internal changes that force us to add some extra magic to support all the versions.
35 36 37 |
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 35 def name @_name ||= ::Rdkafka::Bindings.rd_kafka_name(inner_kafka) end |
#now ⇒ Float
Returns current clock time.
73 74 75 |
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 73 def now ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) * 1_000 end |
#partition_count(topic) ⇒ Integer
This patch makes sure we cache the partition count for a given topic for given time This prevents us in case someone uses `partition_key` from querying for the count with each message. Instead we query once every 30 seconds at most
45 46 47 48 49 50 51 52 53 |
# File 'lib/waterdrop/patches/rdkafka/producer.rb', line 45 def partition_count(topic) closed_producer_check(__method__) @_partitions_count_cache.delete_if do |_, cached| now - cached.first > PARTITIONS_COUNT_TTL end @_partitions_count_cache[topic].last end |