Class: Rdkafka::Producer

Inherits:
Object
  • Object
show all
Includes:
Helpers::OAuth, Helpers::Time
Defined in:
lib/rdkafka/producer.rb,
lib/rdkafka/producer/delivery_handle.rb,
lib/rdkafka/producer/delivery_report.rb,
lib/rdkafka/producer/partitions_count_cache.rb

Overview

A producer for Kafka messages. To create a producer set up a Config and call producer on that.

Defined Under Namespace

Classes: DeliveryHandle, DeliveryReport, PartitionsCountCache, TopicHandleCreationError

Constant Summary collapse

@@partitions_count_cache =
PartitionsCountCache.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helpers::OAuth

#oauthbearer_set_token, #oauthbearer_set_token_failure

Methods included from Helpers::Time

#monotonic_now, #monotonic_now_ms

Constructor Details

#initialize(native_kafka, partitioner) ⇒ Producer

Returns a new instance of Producer.

Parameters:

  • native_kafka (NativeKafka)
  • partitioner (String, nil)

    name of the partitioner we want to use or nil to use the “consistent_random” default



55
56
57
58
59
60
61
62
63
# File 'lib/rdkafka/producer.rb', line 55

def initialize(native_kafka, partitioner)
  @topics_refs_map = {}
  @topics_configs = {}
  @native_kafka = native_kafka
  @partitioner = partitioner || "consistent_random"

  # Makes sure, that native kafka gets closed before it gets GCed by Ruby
  ObjectSpace.define_finalizer(self, native_kafka.finalizer)
end

Instance Attribute Details

#delivery_callbackProc?

Returns the current delivery callback, by default this is nil.

Returns:

  • (Proc, nil)


43
44
45
# File 'lib/rdkafka/producer.rb', line 43

def delivery_callback
  @delivery_callback
end

#delivery_callback_arityInteger? (readonly)

Returns the number of arguments accepted by the callback, by default this is nil.

Returns:

  • (Integer, nil)


49
50
51
# File 'lib/rdkafka/producer.rb', line 49

def delivery_callback_arity
  @delivery_callback_arity
end

Class Method Details

.partitions_count_cacheRdkafka::Producer::PartitionsCountCache

Note:

It is critical to remember, that not all users may have statistics callbacks enabled, hence we should not make assumption that this cache is always updated from the stats.

Global (process wide) partitions cache. We use it to store number of topics partitions, either from the librdkafka statistics (if enabled) or via direct inline calls every now and then. Since the partitions count can only grow and should be same for all consumers and producers, we can use a global cache as long as we ensure that updates only move up.



20
21
22
# File 'lib/rdkafka/producer.rb', line 20

def self.partitions_count_cache
  @@partitions_count_cache
end

.partitions_count_cache=(partitions_count_cache) ⇒ Object

Parameters:



25
26
27
# File 'lib/rdkafka/producer.rb', line 25

def self.partitions_count_cache=(partitions_count_cache)
  @@partitions_count_cache = partitions_count_cache
end

Instance Method Details

#abort_transaction(timeout_ms = -1)) ⇒ true

Abort the current transaction

Parameters:

  • timeout_ms (Integer) (defaults to: -1))

    Timeout in milliseconds (-1 for infinite)

Returns:

  • (true)

    Returns true on success

Raises:



206
207
208
209
210
211
212
213
# File 'lib/rdkafka/producer.rb', line 206

def abort_transaction(timeout_ms = -1)
  closed_producer_check(__method__)

  @native_kafka.with_inner do |inner|
    response_ptr = Rdkafka::Bindings.rd_kafka_abort_transaction(inner, timeout_ms)
    Rdkafka::RdkafkaError.validate!(response_ptr, client_ptr: inner) || true
  end
end

#arity(callback) ⇒ Integer

Figures out the arity of a given block/method

Parameters:

  • callback (#call, Proc)

Returns:

  • (Integer)

    arity of the provided block/method



595
596
597
598
599
# File 'lib/rdkafka/producer.rb', line 595

def arity(callback)
  return callback.arity if callback.respond_to?(:arity)

  callback.method(:call).arity
end

#begin_transactiontrue

Begin a new transaction Requires #init_transactions to have been called first

Returns:

  • (true)

    Returns true on success

Raises:



176
177
178
179
180
181
182
183
184
# File 'lib/rdkafka/producer.rb', line 176

def begin_transaction
  closed_producer_check(__method__)

  @native_kafka.with_inner do |inner|
    response_ptr = Rdkafka::Bindings.rd_kafka_begin_transaction(inner)

    Rdkafka::RdkafkaError.validate!(response_ptr, client_ptr: inner) || true
  end
end

#call_delivery_callback(delivery_report, delivery_handle) ⇒ Object

Calls (if registered) the delivery callback

Parameters:



578
579
580
581
582
583
584
585
586
587
588
589
# File 'lib/rdkafka/producer.rb', line 578

def call_delivery_callback(delivery_report, delivery_handle)
  return unless @delivery_callback

  case @delivery_callback_arity
  when 0
    @delivery_callback.call
  when 1
    @delivery_callback.call(delivery_report)
  else
    @delivery_callback.call(delivery_report, delivery_handle)
  end
end

#closeObject

Close this producer and wait for the internal poll queue to empty.



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/rdkafka/producer.rb', line 243

def close
  return if closed?
  ObjectSpace.undefine_finalizer(self)

  @native_kafka.close do
    # We need to remove the topics references objects before we destroy the producer,
    # otherwise they would leak out
    @topics_refs_map.each_value do |refs|
      refs.each_value do |ref|
        Rdkafka::Bindings.rd_kafka_topic_destroy(ref)
      end
    end
  end

  @topics_refs_map.clear
end

#closed?Boolean

Whether this producer has closed

Returns:

  • (Boolean)


261
262
263
# File 'lib/rdkafka/producer.rb', line 261

def closed?
  @native_kafka.closed?
end

#commit_transaction(timeout_ms = -1)) ⇒ true

Commit the current transaction

Parameters:

  • timeout_ms (Integer) (defaults to: -1))

    Timeout in milliseconds (-1 for infinite)

Returns:

  • (true)

    Returns true on success

Raises:



191
192
193
194
195
196
197
198
199
# File 'lib/rdkafka/producer.rb', line 191

def commit_transaction(timeout_ms = -1)
  closed_producer_check(__method__)

  @native_kafka.with_inner do |inner|
    response_ptr = Rdkafka::Bindings.rd_kafka_commit_transaction(inner, timeout_ms)

    Rdkafka::RdkafkaError.validate!(response_ptr, client_ptr: inner) || true
  end
end

#enable_background_queue_io_events(fd, payload = "\x01") ⇒ nil

Enable IO event notifications for background events

Parameters:

  • fd (Integer)

    file descriptor to signal (from IO.pipe or eventfd)

  • payload (String) (defaults to: "\x01")

    data to write to fd (default: “x01”)

Returns:

  • (nil)

Raises:



141
142
143
# File 'lib/rdkafka/producer.rb', line 141

def enable_background_queue_io_events(fd, payload = "\x01")
  @native_kafka.enable_background_queue_io_events(fd, payload)
end

#enable_queue_io_events(fd, payload = "\x01") ⇒ nil

Enable IO event notifications for fiber scheduler integration When delivery confirmations arrive, librdkafka will write to your FD

Parameters:

  • fd (Integer)

    file descriptor to signal (from IO.pipe or eventfd)

  • payload (String) (defaults to: "\x01")

    data to write to fd (default: “x01”)

Returns:

  • (nil)

Raises:



132
133
134
# File 'lib/rdkafka/producer.rb', line 132

def enable_queue_io_events(fd, payload = "\x01")
  @native_kafka.enable_main_queue_io_events(fd, payload)
end

#flush(timeout_ms = Defaults::PRODUCER_FLUSH_TIMEOUT_MS) ⇒ Boolean

Note:

We raise an exception for other errors because based on the librdkafka docs, there should be no other errors.

Note:

For ‘timed_out` we do not raise an error to keep it backwards compatible

Wait until all outstanding producer requests are completed, with the given timeout in seconds. Call this before closing a producer to ensure delivery of all messages.

Parameters:

  • timeout_ms (Integer) (defaults to: Defaults::PRODUCER_FLUSH_TIMEOUT_MS)

    how long should we wait for flush of all messages

Returns:

  • (Boolean)

    true if no more data and all was flushed, false in case there are still outgoing messages after the timeout



276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/rdkafka/producer.rb', line 276

def flush(timeout_ms = Defaults::PRODUCER_FLUSH_TIMEOUT_MS)
  closed_producer_check(__method__)

  error = @native_kafka.with_inner do |inner|
    response = Rdkafka::Bindings.rd_kafka_flush(inner, timeout_ms)
    Rdkafka::RdkafkaError.build(response)
  end

  # Early skip not to build the error message
  return true unless error
  return false if error.code == :timed_out

  raise(error)
end

#init_transactionstrue

Initialize transactions for the producer Must be called once before any transactional operations

Returns:

  • (true)

    Returns true on success

Raises:



161
162
163
164
165
166
167
168
169
# File 'lib/rdkafka/producer.rb', line 161

def init_transactions
  closed_producer_check(__method__)

  @native_kafka.with_inner do |inner|
    response_ptr = Rdkafka::Bindings.rd_kafka_init_transactions(inner, -1)

    Rdkafka::RdkafkaError.validate!(response_ptr, client_ptr: inner) || true
  end
end

#nameString

Returns producer name.

Returns:

  • (String)

    producer name



119
120
121
122
123
# File 'lib/rdkafka/producer.rb', line 119

def name
  @name ||= @native_kafka.with_inner do |inner|
    ::Rdkafka::Bindings.rd_kafka_name(inner)
  end
end

#partition_count(topic) ⇒ Integer

Note:

If ‘allow.auto.create.topics’ is set to true in the broker, the topic will be auto-created after returning nil.

Note:

We cache the partition count for a given topic for given time. If statistics are enabled for any producer or consumer, it will take precedence over per instance fetching.

This prevents us in case someone uses ‘partition_key` from querying for the count with each message. Instead we query at most once every 30 seconds at most if we have a valid partition count or every 5 seconds in case we were not able to obtain number of partitions.

Partition count for a given topic.

Parameters:

  • topic (String)

    The topic name.

Returns:

  • (Integer)

    partition count for a given topic or ‘RD_KAFKA_PARTITION_UA (-1)` if it could not be obtained.



394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# File 'lib/rdkafka/producer.rb', line 394

def partition_count(topic)
  closed_producer_check(__method__)

  self.class.partitions_count_cache.get(topic) do
     = nil

    @native_kafka.with_inner do |inner|
       = ::Rdkafka::Metadata.new(inner, topic).topics&.first
    end

     ? [:partition_count] : Rdkafka::Bindings::RD_KAFKA_PARTITION_UA
  end
rescue Rdkafka::RdkafkaError => e
  # If the topic does not exist, it will be created or if not allowed another error will be
  # raised. We here return RD_KAFKA_PARTITION_UA so this can happen without early error
  # happening on metadata discovery.
  return Rdkafka::Bindings::RD_KAFKA_PARTITION_UA if e.code == :unknown_topic_or_part

  raise(e)
end

#poll_drain_nb(timeout_ms = 100) ⇒ Boolean

Note:

This method holds the inner lock for up to ‘timeout_ms`. Other producer operations (produce, close, etc.) will wait until this method returns.

Note:

This method is thread-safe as it uses @native_kafka.with_inner synchronization

Drains the producer’s event queue by continuously polling until empty or time limit reached.

This method is useful when you need to ensure delivery callbacks are processed within a bounded time, particularly when polling multiple producers from a single thread where fair scheduling is required to prevent starvation.

Uses non-blocking polls internally (no GVL release) for efficiency. The method holds a single ‘with_inner` lock for the duration, minimizing per-poll overhead when processing many events.

Examples:

Basic usage - drain for up to 100ms

fully_drained = producer.poll_drain_nb

Round-robin polling multiple producers fairly

producers.each do |producer|
  fully_drained = producer.poll_drain_nb(10)
  # If false, this producer has more pending events
end

Parameters:

  • timeout_ms (Integer) (defaults to: 100)

    maximum time to spend draining in milliseconds (default: 100)

Returns:

  • (Boolean)

    true if no more events to process, false if stopped due to time limit

Raises:



367
368
369
370
371
372
373
374
375
376
377
378
# File 'lib/rdkafka/producer.rb', line 367

def poll_drain_nb(timeout_ms = 100)
  closed_producer_check(__method__)

  @native_kafka.with_inner do |inner|
    deadline = monotonic_now_ms + timeout_ms

    loop do
      break true if Rdkafka::Bindings.rd_kafka_poll_nb(inner, 0).zero?
      break false if monotonic_now_ms >= deadline
    end
  end
end

#produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil, label: nil, topic_config: EMPTY_HASH, partitioner: @partitioner) ⇒ DeliveryHandle

Produces a message to a Kafka topic. The message is added to rdkafka’s queue, call wait on the returned delivery handle to make sure it is delivered.

When no partition is specified the underlying Kafka library picks a partition based on the key. If no key is specified, a random partition will be used. When a timestamp is provided this is used instead of the auto-generated timestamp.

Parameters:

  • topic (String)

    The topic to produce to

  • payload (String, nil) (defaults to: nil)
  • key (String, nil) (defaults to: nil)
  • partition (Integer, nil) (defaults to: nil)

    Optional partition to produce to

  • partition_key (String, nil) (defaults to: nil)

    Optional partition key based on which partition assignment can happen

  • timestamp (Time, Integer, nil) (defaults to: nil)

    Optional timestamp of this message. Integer timestamp is in milliseconds since Jan 1 1970.

  • headers (Hash{String => String, Array<String>}) (defaults to: nil)

    Optional message headers. Values can be either a single string or an array of strings to support duplicate headers per KIP-82

  • label (Object, nil) (defaults to: nil)

    a label that can be assigned when producing a message that will be part of the delivery handle and the delivery report

  • topic_config (Hash) (defaults to: EMPTY_HASH)

    topic config for given message dispatch. Allows to send messages to topics with different configuration

  • partitioner (String) (defaults to: @partitioner)

    name of the partitioner to use

Returns:

  • (DeliveryHandle)

    Delivery handle that can be used to wait for the result of producing this message

Raises:

  • (RdkafkaError)

    When adding the message to rdkafka’s queue failed



434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
# File 'lib/rdkafka/producer.rb', line 434

def produce(
  topic:,
  payload: nil,
  key: nil,
  partition: nil,
  partition_key: nil,
  timestamp: nil,
  headers: nil,
  label: nil,
  topic_config: EMPTY_HASH,
  partitioner: @partitioner
)
  closed_producer_check(__method__)

  # Start by checking and converting the input

  # Get payload length
  payload_size = if payload.nil?
    0
  else
    payload.bytesize
  end

  # Get key length
  key_size = if key.nil?
    0
  else
    key.bytesize
  end

  topic_config_hash = topic_config.hash

  # Checks if we have the rdkafka topic reference object ready. It saves us on object
  # allocation and allows to use custom config on demand.
  set_topic_config(topic, topic_config, topic_config_hash) unless @topics_refs_map.dig(topic, topic_config_hash)
  topic_ref = @topics_refs_map.dig(topic, topic_config_hash)

  if partition_key
    partition_count = partition_count(topic)

    # Check if there are no overrides for the partitioner and use the default one only when
    # no per-topic is present.
    selected_partitioner = @topics_configs.dig(topic, topic_config_hash, :partitioner) || partitioner

    # If the topic is not present, set to -1
    if partition_count.positive?
      partition = Rdkafka::Bindings.partitioner(
        topic_ref,
        partition_key,
        partition_count,
        selected_partitioner
      )
    end
  end

  # If partition is nil, use RD_KAFKA_PARTITION_UA to let librdafka set the partition randomly or
  # based on the key when present.
  partition ||= Rdkafka::Bindings::RD_KAFKA_PARTITION_UA

  # If timestamp is nil use 0 and let Kafka set one. If an integer or time
  # use it.
  raw_timestamp = if timestamp.nil?
    0
  elsif timestamp.is_a?(Integer)
    timestamp
  elsif timestamp.is_a?(Time)
    (timestamp.to_i * 1000) + (timestamp.usec / 1000)
  else
    raise TypeError.new("Timestamp has to be nil, an Integer or a Time")
  end

  delivery_handle = DeliveryHandle.new
  delivery_handle.label = label
  delivery_handle.topic = topic
  delivery_handle[:pending] = true
  delivery_handle[:response] = Rdkafka::Bindings::RD_KAFKA_PARTITION_UA
  delivery_handle[:partition] = Rdkafka::Bindings::RD_KAFKA_PARTITION_UA
  delivery_handle[:offset] = Rdkafka::Bindings::RD_KAFKA_PARTITION_UA
  DeliveryHandle.register(delivery_handle)

  args = [
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_RKT, :pointer, topic_ref,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_MSGFLAGS, :int, Rdkafka::Bindings::RD_KAFKA_MSG_F_COPY,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_VALUE, :buffer_in, payload, :size_t, payload_size,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_KEY, :buffer_in, key, :size_t, key_size,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_PARTITION, :int32, partition,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TIMESTAMP, :int64, raw_timestamp,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_OPAQUE, :pointer, delivery_handle
  ]

  if headers && !headers.empty?
    headers.each do |key0, value0|
      key = key0.to_s
      case value0
      when Array
        # Handle array of values per KIP-82
        value0.each do |v|
          value = v.to_s
          args.push(
            :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_HEADER,
            :string, key,
            :pointer, value,
            :size_t, value.bytesize
          )
        end
      else
        # Handle single value
        value = value0.to_s
        args.push(
          :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_HEADER,
          :string, key,
          :pointer, value,
          :size_t, value.bytesize
        )
      end
    end
  end

  args.push(:int, Rdkafka::Bindings::RD_KAFKA_VTYPE_END)

  # Produce the message
  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_producev(
      inner,
      *args
    )
  end

  # Raise error if the produce call was not successful
  if response != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
    DeliveryHandle.remove(delivery_handle.to_ptr.address)

    @native_kafka.with_inner do |inner|
      Rdkafka::RdkafkaError.validate!(response, client_ptr: inner)
    end
  end

  delivery_handle
end

#purgeObject

Purges the outgoing queue and releases all resources.

Useful when closing the producer with outgoing messages to unstable clusters or when for any other reasons waiting cannot go on anymore. This purges both the queue and all the inflight requests + updates the delivery handles statuses so they can be materialized into ‘purge_queue` errors.



297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/rdkafka/producer.rb', line 297

def purge
  closed_producer_check(__method__)

  @native_kafka.with_inner do |inner|
    response = Bindings.rd_kafka_purge(
      inner,
      Bindings::RD_KAFKA_PURGE_F_QUEUE | Bindings::RD_KAFKA_PURGE_F_INFLIGHT
    )

    Rdkafka::RdkafkaError.validate!(response, client_ptr: inner)
  end

  # Wait for the purge to affect everything
  sleep(Defaults::PRODUCER_PURGE_SLEEP_INTERVAL_MS / 1_000.0) until flush(Defaults::PRODUCER_PURGE_FLUSH_TIMEOUT_MS)

  true
end

#queue_sizeInteger Also known as: queue_length

Note:

This method is thread-safe as it uses the @native_kafka.with_inner synchronization

Returns the number of messages and requests waiting to be sent to the broker as well as delivery reports queued for the application.

This provides visibility into the producer’s internal queue depth, useful for:

  • Monitoring producer backpressure

  • Implementing custom flow control

  • Debugging message delivery issues

  • Graceful shutdown logic (wait until queue is empty)

Examples:

producer.queue_size #=> 42

Returns:

  • (Integer)

    the number of messages in the queue

Raises:



331
332
333
334
335
336
337
# File 'lib/rdkafka/producer.rb', line 331

def queue_size
  closed_producer_check(__method__)

  @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_outq_len(inner)
  end
end

#send_offsets_to_transaction(consumer, tpl, timeout_ms = Defaults::PRODUCER_SEND_OFFSETS_TIMEOUT_MS) ⇒ Object

Note:

Use only in the context of an active transaction

Sends provided offsets of a consumer to the transaction for collective commit

Parameters:

  • consumer (Consumer)

    consumer that owns the given tpls

  • tpl (Consumer::TopicPartitionList)
  • timeout_ms (Integer) (defaults to: Defaults::PRODUCER_SEND_OFFSETS_TIMEOUT_MS)

    offsets send timeout



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/rdkafka/producer.rb', line 221

def send_offsets_to_transaction(consumer, tpl, timeout_ms = Defaults::PRODUCER_SEND_OFFSETS_TIMEOUT_MS)
  closed_producer_check(__method__)

  return if tpl.empty?

   = consumer.
  native_tpl = tpl.to_native_tpl

  @native_kafka.with_inner do |inner|
    response_ptr = Bindings.rd_kafka_send_offsets_to_transaction(inner, native_tpl, , timeout_ms)

    Rdkafka::RdkafkaError.validate!(response_ptr, client_ptr: inner)
  end
ensure
  if  && !.null?
    Bindings.()
  end

  Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(native_tpl) unless native_tpl.nil?
end

#set_topic_config(topic, config, config_hash) ⇒ Object

Note:

It is not allowed to re-set the same topic config twice because of the underlying librdkafka caching

Sets alternative set of configuration details that can be set per topic

Parameters:

  • topic (String)

    The topic name

  • config (Hash)

    config we want to use per topic basis

  • config_hash (Integer)

    hash of the config. We expect it here instead of computing it, because it is already computed during the retrieval attempt in the ‘#produce` flow.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/rdkafka/producer.rb', line 73

def set_topic_config(topic, config, config_hash)
  # Ensure lock on topic reference just in case
  @native_kafka.with_inner do |inner|
    @topics_refs_map[topic] ||= {}
    @topics_configs[topic] ||= {}

    return if @topics_configs[topic].key?(config_hash)

    # If config is empty, we create an empty reference that will be used with defaults
    rd_topic_config = if config.empty?
      nil
    else
      Rdkafka::Bindings.rd_kafka_topic_conf_new.tap do |topic_config|
        config.each do |key, value|
          error_buffer = FFI::MemoryPointer.new(:char, 256)
          result = Rdkafka::Bindings.rd_kafka_topic_conf_set(
            topic_config,
            key.to_s,
            value.to_s,
            error_buffer,
            256
          )

          unless result == :config_ok
            raise Config::ConfigError.new(error_buffer.read_string)
          end
        end
      end
    end

    topic_handle = Bindings.rd_kafka_topic_new(inner, topic, rd_topic_config)

    raise TopicHandleCreationError.new("Error creating topic handle for topic #{topic}") if topic_handle.null?

    @topics_configs[topic][config_hash] = config
    @topics_refs_map[topic][config_hash] = topic_handle
  end
end

#startObject

Note:

Not needed to run unless explicit start was disabled

Starts the native Kafka polling thread and kicks off the init polling



114
115
116
# File 'lib/rdkafka/producer.rb', line 114

def start
  @native_kafka.start
end