Class: Rdkafka::Consumer

Inherits:
Object
  • Object
show all
Includes:
Enumerable, Helpers::OAuth, Helpers::Time
Defined in:
lib/rdkafka/consumer.rb,
lib/rdkafka/consumer/headers.rb,
lib/rdkafka/consumer/message.rb,
lib/rdkafka/consumer/partition.rb,
lib/rdkafka/consumer/topic_partition_list.rb

Overview

A consumer of Kafka messages. It uses the high-level consumer approach where the Kafka brokers automatically assign partitions and load balance partitions over consumers that have the same ‘:“group.id”` set in their configuration.

To create a consumer set up a Config and call consumer on that. It is mandatory to set ‘:“group.id”` in the configuration.

Consumer implements ‘Enumerable`, so you can use `each` to consume messages, or for example `each_slice` to consume batches of messages.

Defined Under Namespace

Modules: Headers Classes: Message, Partition, TopicPartitionList

Instance Method Summary collapse

Methods included from Helpers::OAuth

#oauthbearer_set_token, #oauthbearer_set_token_failure

Methods included from Helpers::Time

#monotonic_now, #monotonic_now_ms

Constructor Details

#initialize(native_kafka) ⇒ Consumer

Returns a new instance of Consumer.

Parameters:

  • native_kafka (NativeKafka)

    wrapper around the native Kafka consumer handle



20
21
22
23
24
25
# File 'lib/rdkafka/consumer.rb', line 20

def initialize(native_kafka)
  @native_kafka = native_kafka

  # Makes sure, that native kafka gets closed before it gets GCed by Ruby
  ObjectSpace.define_finalizer(self, native_kafka.finalizer)
end

Instance Method Details

#assign(list) ⇒ Object

Atomic assignment of partitions to consume

Parameters:

Raises:



325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/rdkafka/consumer.rb', line 325

def assign(list)
  closed_consumer_check(__method__)

  unless list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  begin
    response = @native_kafka.with_inner do |inner|
      Rdkafka::Bindings.rd_kafka_assign(inner, tpl)
    end
    if response != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
      raise Rdkafka::RdkafkaError.new(response, "Error assigning '#{list.to_h}'")
    end
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
  end
end

#assignmentTopicPartitionList

Returns the current partition assignment.

Returns:

Raises:



350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# File 'lib/rdkafka/consumer.rb', line 350

def assignment
  closed_consumer_check(__method__)

  ptr = FFI::MemoryPointer.new(:pointer)
  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_assignment(inner, ptr)
  end
  if response != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
    raise Rdkafka::RdkafkaError.new(response)
  end

  tpl = ptr.read_pointer

  if !tpl.null?
    begin
      Rdkafka::Consumer::TopicPartitionList.from_native_tpl(tpl)
    ensure
      Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy tpl
    end
  end
ensure
  ptr&.free
end

#assignment_lost?Boolean

Returns true if our current assignment has been lost involuntarily.

Returns:

  • (Boolean)

    true if our current assignment has been lost involuntarily.



375
376
377
378
379
380
381
# File 'lib/rdkafka/consumer.rb', line 375

def assignment_lost?
  closed_consumer_check(__method__)

  @native_kafka.with_inner do |inner|
    !Rdkafka::Bindings.rd_kafka_assignment_lost(inner).zero?
  end
end

#closenil

Close this consumer

Returns:

  • (nil)


180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/rdkafka/consumer.rb', line 180

def close
  return if closed?
  ObjectSpace.undefine_finalizer(self)

  @native_kafka.synchronize do |inner|
    Rdkafka::Bindings.rd_kafka_consumer_close(inner)

    if @consumer_queue
      Rdkafka::Bindings.rd_kafka_queue_destroy(@consumer_queue)
      @consumer_queue = nil
    end
  end

  @native_kafka.close
end

#closed?Boolean

Whether this consumer has closed

Returns:

  • (Boolean)


197
198
199
# File 'lib/rdkafka/consumer.rb', line 197

def closed?
  @native_kafka.closed?
end

#cluster_idString?

Returns the ClusterId as reported in broker metadata.

Returns:

  • (String, nil)


513
514
515
516
517
518
# File 'lib/rdkafka/consumer.rb', line 513

def cluster_id
  closed_consumer_check(__method__)
  @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_clusterid(inner)
  end
end

#commit(list = nil, async = false) ⇒ nil

Manually commit the current offsets of this consumer.

To use this set ‘enable.auto.commit`to `false` to disable automatic triggering of commits.

If ‘enable.auto.offset.store` is set to `true` the offset of the last consumed message for every partition is used. If set to `false` you can use #store_offset to indicate when a message has been fully processed.

Parameters:

  • list (TopicPartitionList, nil) (defaults to: nil)

    The topic with partitions to commit

  • async (Boolean) (defaults to: false)

    Whether to commit async or wait for the commit to finish

Returns:

  • (nil)

Raises:



655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
# File 'lib/rdkafka/consumer.rb', line 655

def commit(list = nil, async = false)
  closed_consumer_check(__method__)

  if !list.nil? && !list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be nil or a TopicPartitionList")
  end

  tpl = list&.to_native_tpl

  begin
    response = @native_kafka.with_inner do |inner|
      Rdkafka::Bindings.rd_kafka_commit(inner, tpl, async)
    end
    if response != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
      raise Rdkafka::RdkafkaError.new(response)
    end
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl
  end
end

#committed(list = nil, timeout_ms = Defaults::CONSUMER_COMMITTED_TIMEOUT_MS) ⇒ TopicPartitionList

Return the current committed offset per partition for this consumer group. The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition.

Parameters:

  • list (TopicPartitionList, nil) (defaults to: nil)

    The topic with partitions to get the offsets for or nil to use the current subscription.

  • timeout_ms (Integer) (defaults to: Defaults::CONSUMER_COMMITTED_TIMEOUT_MS)

    The timeout for fetching this information.

Returns:

Raises:

  • (RdkafkaError)

    When getting the committed positions fails.



392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
# File 'lib/rdkafka/consumer.rb', line 392

def committed(list = nil, timeout_ms = Defaults::CONSUMER_COMMITTED_TIMEOUT_MS)
  closed_consumer_check(__method__)

  if list.nil?
    list = assignment
  elsif !list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be nil or a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  begin
    response = @native_kafka.with_inner do |inner|
      Rdkafka::Bindings.rd_kafka_committed(inner, tpl, timeout_ms)
    end
    if response != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
      raise Rdkafka::RdkafkaError.new(response)
    end
    TopicPartitionList.from_native_tpl(tpl)
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
  end
end

#consumer_group_metadata_pointerObject

Note:

This pointer needs to be removed with ‘#rd_kafka_consumer_group_metadata_destroy`

Returns pointer to the consumer group metadata. It is used only in the context of exactly-once-semantics in transactions, this is why it is never remapped to Ruby

This API is not usable by itself from Ruby



969
970
971
972
973
974
975
# File 'lib/rdkafka/consumer.rb', line 969

def 
  closed_consumer_check(__method__)

  @native_kafka.with_inner do |inner|
    Bindings.(inner)
  end
end

#each(timeout_ms: Defaults::CONSUMER_POLL_TIMEOUT_MS) {|message| ... } ⇒ nil

Poll for new messages and yield for each received one. Iteration will end when the consumer is closed.

If ‘enable.partition.eof` is turned on in the config this will raise an error when an eof is reached, so you probably want to disable that when using this method of iteration.

Parameters:

  • timeout_ms (Integer) (defaults to: Defaults::CONSUMER_POLL_TIMEOUT_MS)

    Timeout for each poll iteration

Yield Parameters:

  • message (Message)

    Received message

Returns:

  • (nil)

Raises:



923
924
925
926
927
928
929
930
931
932
933
934
# File 'lib/rdkafka/consumer.rb', line 923

def each(timeout_ms: Defaults::CONSUMER_POLL_TIMEOUT_MS)
  loop do
    message = poll(timeout_ms)
    if message
      yield(message)
    elsif closed?
      break
    else
      next
    end
  end
end

#each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block) ⇒ Object

Deprecated.

This method has been removed due to data consistency concerns

Parameters:

  • max_items (Integer) (defaults to: 100)

    unused

  • bytes_threshold (Numeric) (defaults to: Float::INFINITY)

    unused

  • timeout_ms (Integer) (defaults to: 250)

    unused

  • yield_on_error (Boolean) (defaults to: false)

    unused

  • block (Proc)

    unused block

Raises:

  • (NotImplementedError)

    Always raises as this method is no longer supported



943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
# File 'lib/rdkafka/consumer.rb', line 943

def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block)
  raise NotImplementedError, <<~ERROR
    `each_batch` has been removed due to data consistency concerns.

    This method was removed because it did not properly handle partition reassignments,
    which could lead to processing messages from partitions that were no longer owned
    by this consumer, resulting in duplicate message processing and data inconsistencies.

    Recommended alternatives:

    1. Implement your own batching logic using rebalance callbacks to properly handle
       partition revocations and ensure message processing correctness.

    2. Use a high-level batching library that supports proper partition reassignment
       handling out of the box (such as the Karafka framework).
  ERROR
end

#enable_background_queue_io_events(fd, payload = "\x01") ⇒ nil

Enable IO event notifications for background events

Parameters:

  • fd (Integer)

    file descriptor to signal (from IO.pipe or eventfd)

  • payload (String) (defaults to: "\x01")

    data to write to fd (default: “x01”)

Returns:

  • (nil)

Raises:



77
78
79
# File 'lib/rdkafka/consumer.rb', line 77

def enable_background_queue_io_events(fd, payload = "\x01")
  @native_kafka.enable_background_queue_io_events(fd, payload)
end

#enable_queue_io_events(fd, payload = "\x01") ⇒ nil

Enable IO event notifications for fiber scheduler integration When the consumer queue has messages, librdkafka will write to your FD

Examples:

Using with fiber scheduler

consumer = config.consumer
consumer.subscribe("topic")

# Create notification FD
signal_r, signal_w = IO.pipe

# Enable librdkafka to signal when messages arrive
consumer.enable_queue_io_events(signal_w.fileno)

# Monitor with select/poll
loop do
  readable, = IO.select([signal_r], nil, nil, timeout)
  if readable
    signal_r.read_nonblock(1024) rescue nil  # Drain signal
    while msg = consumer.poll(0)
      process(msg)
    end
  end
end

Parameters:

  • fd (Integer)

    file descriptor to signal (from IO.pipe or eventfd)

  • payload (String) (defaults to: "\x01")

    data to write to fd (default: “x01”)

Returns:

  • (nil)

Raises:



68
69
70
# File 'lib/rdkafka/consumer.rb', line 68

def enable_queue_io_events(fd, payload = "\x01")
  @native_kafka.enable_main_queue_io_events(fd, payload)
end

#events_poll(timeout_ms = Defaults::CONSUMER_EVENTS_POLL_TIMEOUT_MS) ⇒ Object

Note:

This method technically should be called ‘#poll` and the current `#poll` should be called `#consumer_poll` though we keep the current naming convention to make it backward compatible.

Polls the main rdkafka queue (not the consumer one). Do NOT use it if ‘consumer_poll_set`

was set to `true`.

Events will cause application-provided callbacks to be called.

Events (in the context of the consumer):

- error callbacks
- stats callbacks
- any other callbacks supported by librdkafka that are not part of the consumer_poll, that
  would have a callback configured and activated.

This method needs to be called at regular intervals to serve any queued callbacks waiting to be called. When in use, does NOT replace ‘#poll` but needs to run complementary with it.

Parameters:

  • timeout_ms (Integer) (defaults to: Defaults::CONSUMER_EVENTS_POLL_TIMEOUT_MS)

    poll timeout. If set to 0 will run async, when set to -1 will block until any events available.



767
768
769
770
771
# File 'lib/rdkafka/consumer.rb', line 767

def events_poll(timeout_ms = Defaults::CONSUMER_EVENTS_POLL_TIMEOUT_MS)
  @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_poll(inner, timeout_ms)
  end
end

#events_poll_nb(timeout_ms = 0) ⇒ Integer

Polls the main rdkafka queue without releasing the GVL (Global VM Lock).

This is more efficient than regular events_poll for non-blocking poll(0) calls, particularly useful in fiber scheduler contexts where GVL release/reacquire overhead is wasteful since we don’t expect to wait.

Parameters:

  • timeout_ms (Integer) (defaults to: 0)

    poll timeout (default: 0 for non-blocking)

Returns:

  • (Integer)

    the number of events served

See Also:



783
784
785
786
787
# File 'lib/rdkafka/consumer.rb', line 783

def events_poll_nb(timeout_ms = 0)
  @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_poll_nb(inner, timeout_ms)
  end
end

#events_poll_nb_each {|count| ... } ⇒ nil

Note:

This method holds the inner lock until the queue is empty or ‘:stop` is returned. Other consumer operations will wait until this method returns.

Note:

This method is thread-safe as it uses @native_kafka.with_inner synchronization

Note:

Do NOT use this if ‘consumer_poll_set` was set to `true`

Polls for events in a non-blocking loop, yielding the count after each iteration.

This method processes events (stats, errors, etc.) in a single GVL/mutex session, which is more efficient than repeated individual polls. It uses non-blocking polls internally (no GVL release between polls).

Yields the count of events processed after each poll iteration, allowing the caller to implement timeout or other termination logic by returning ‘:stop`.

Examples:

Drain all pending events

consumer.events_poll_nb_each { |_count| }

With timeout control

deadline = monotonic_now + timeout_ms
consumer.events_poll_nb_each do |_count|
  :stop if monotonic_now >= deadline
end

Yields:

  • (count)

    Called after each poll iteration

Yield Parameters:

  • count (Integer)

    Number of events processed in this iteration

Yield Returns:

  • (Symbol, Object)

    Return ‘:stop` to break the loop, any other value continues

Returns:

  • (nil)

Raises:



109
110
111
112
113
114
115
116
117
118
119
# File 'lib/rdkafka/consumer.rb', line 109

def events_poll_nb_each
  closed_consumer_check(__method__)

  @native_kafka.with_inner do |inner|
    loop do
      count = Rdkafka::Bindings.rd_kafka_poll_nb(inner, 0)
      break if count.zero?
      break if yield(count) == :stop
    end
  end
end

#lag(topic_partition_list, watermark_timeout_ms = Defaults::CONSUMER_LAG_TIMEOUT_MS) ⇒ Hash{String => Hash{Integer => Integer}}

Calculate the consumer lag per partition for the provided topic partition list. You can get a suitable list by calling #committed or #position (TODO). It is also possible to create one yourself, in this case you have to provide a list that already contains all the partitions you need the lag for.

Parameters:

  • topic_partition_list (TopicPartitionList)

    The list to calculate lag for.

  • watermark_timeout_ms (Integer) (defaults to: Defaults::CONSUMER_LAG_TIMEOUT_MS)

    The timeout for each query watermark call.

Returns:

  • (Hash{String => Hash{Integer => Integer}})

    A hash containing all topics with the lag per partition

Raises:



489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
# File 'lib/rdkafka/consumer.rb', line 489

def lag(topic_partition_list, watermark_timeout_ms = Defaults::CONSUMER_LAG_TIMEOUT_MS)
  out = {}

  topic_partition_list.to_h.each do |topic, partitions|
    # Query high watermarks for this topic's partitions
    # and compare to the offset in the list.
    topic_out = {}
    partitions.each do |p|
      next if p.offset.nil?
      _low, high = query_watermark_offsets(
        topic,
        p.partition,
        watermark_timeout_ms
      )
      topic_out[p.partition] = high - p.offset
    end
    out[topic] = topic_out
  end
  out
end

#member_idString?

Returns this client’s broker-assigned group member id

This currently requires the high-level KafkaConsumer

Returns:

  • (String, nil)


525
526
527
528
529
530
# File 'lib/rdkafka/consumer.rb', line 525

def member_id
  closed_consumer_check(__method__)
  @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_memberid(inner)
  end
end

#nameString

Returns consumer name.

Returns:

  • (String)

    consumer name



34
35
36
37
38
# File 'lib/rdkafka/consumer.rb', line 34

def name
  @name ||= @native_kafka.with_inner do |inner|
    ::Rdkafka::Bindings.rd_kafka_name(inner)
  end
end

#offsets_for_times(list, timeout_ms = Defaults::CONSUMER_OFFSETS_FOR_TIMES_TIMEOUT_MS) ⇒ TopicPartitionList

Lookup offset for the given partitions by timestamp.

Parameters:

  • list (TopicPartitionList)

    The TopicPartitionList with timestamps instead of offsets

  • timeout_ms (Integer) (defaults to: Defaults::CONSUMER_OFFSETS_FOR_TIMES_TIMEOUT_MS)

    timeout in milliseconds for the operation

Returns:

Raises:

  • (RdKafkaError)

    When the OffsetForTimes lookup fails



616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
# File 'lib/rdkafka/consumer.rb', line 616

def offsets_for_times(list, timeout_ms = Defaults::CONSUMER_OFFSETS_FOR_TIMES_TIMEOUT_MS)
  closed_consumer_check(__method__)

  if !list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_offsets_for_times(
      inner,
      tpl,
      timeout_ms # timeout
    )
  end

  if response != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
    raise Rdkafka::RdkafkaError.new(response)
  end

  TopicPartitionList.from_native_tpl(tpl)
ensure
  Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl
end

#pause(list) ⇒ nil

Pause producing or consumption for the provided list of partitions

Parameters:

Returns:

  • (nil)

Raises:



247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/rdkafka/consumer.rb', line 247

def pause(list)
  closed_consumer_check(__method__)

  unless list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  begin
    response = @native_kafka.with_inner do |inner|
      Rdkafka::Bindings.rd_kafka_pause_partitions(inner, tpl)
    end

    if response != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
      list = TopicPartitionList.from_native_tpl(tpl)
      raise Rdkafka::RdkafkaTopicPartitionListError.new(response, list, "Error pausing '#{list.to_h}'")
    end
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
  end
end

#poll(timeout_ms) ⇒ Message?

Poll for the next message on one of the subscribed topics

Parameters:

  • timeout_ms (Integer)

    Timeout of this poll

Returns:

  • (Message, nil)

    A message or nil if there was no new message within the timeout

Raises:



681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
# File 'lib/rdkafka/consumer.rb', line 681

def poll(timeout_ms)
  closed_consumer_check(__method__)

  message_ptr = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_consumer_poll(inner, timeout_ms)
  end
  if message_ptr.null?
    nil
  else
    # Create struct wrapper
    native_message = Rdkafka::Bindings::Message.new(message_ptr)
    # Raise error if needed
    if native_message[:err] != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
      raise Rdkafka::RdkafkaError.new(native_message[:err])
    end
    # Create a message to pass out
    Rdkafka::Consumer::Message.new(native_message)
  end
ensure
  # Clean up rdkafka message if there is one
  if message_ptr && !message_ptr.null?
    Rdkafka::Bindings.rd_kafka_message_destroy(message_ptr)
  end
end

#poll_batch(timeout_ms, max_items: 100) ⇒ Array<Message>

Poll for a batch of messages from the consumer queue in a single FFI call.

This is more efficient than calling #poll in a loop because it crosses the FFI boundary only once to fetch up to ‘max_items` messages.

The timeout controls how long to wait for the first message. Once any message is available, librdkafka fills the buffer with whatever is immediately ready and returns without further waiting.

Parameters:

  • timeout_ms (Integer)

    Timeout waiting for the first message (-1 for infinite)

  • max_items (Integer) (defaults to: 100)

    Maximum number of messages to return per call

Returns:

  • (Array<Message>)

    Array of messages (empty if none available within timeout)

Raises:



803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
# File 'lib/rdkafka/consumer.rb', line 803

def poll_batch(timeout_ms, max_items: 100)
  closed_consumer_check(__method__)

  buffer = batch_buffer(max_items)
  messages = []

  count = @native_kafka.with_inner do |_inner|
    Rdkafka::Bindings.rd_kafka_consume_batch_queue(
      consumer_queue,
      timeout_ms,
      buffer,
      max_items
    )
  end

  return messages if count <= 0

  i = 0
  begin
    while i < count
      ptr = buffer.get_pointer(i * FFI::Pointer.size)

      if ptr.null?
        i += 1
        next
      end

      native_message = Rdkafka::Bindings::Message.new(ptr)

      if native_message[:err] != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
        raise Rdkafka::RdkafkaError.new(native_message[:err])
      end

      messages << Rdkafka::Consumer::Message.new(native_message)
      Rdkafka::Bindings.rd_kafka_message_destroy(ptr)
      i += 1
    end
  ensure
    while i < count
      ptr = buffer.get_pointer(i * FFI::Pointer.size)
      Rdkafka::Bindings.rd_kafka_message_destroy(ptr) unless ptr.null?
      i += 1
    end
  end

  messages
end

#poll_batch_nb(timeout_ms = 0, max_items: 100) ⇒ Array<Message>

Note:

Since the GVL is not released, a non-zero timeout_ms will block all Ruby threads/fibers for the duration. Use #poll_batch if you need a blocking wait.

Poll for a batch of messages without releasing the GVL (Global VM Lock).

This is more efficient than #poll_batch for non-blocking poll(0) calls, particularly useful in fiber scheduler contexts where GVL release/reacquire overhead is wasteful since we don’t expect to wait.

Parameters:

  • timeout_ms (Integer) (defaults to: 0)

    Timeout waiting for the first message (default: 0 for non-blocking)

  • max_items (Integer) (defaults to: 100)

    Maximum number of messages to return per call

Returns:

  • (Array<Message>)

    Array of messages (empty if none available within timeout)

Raises:



865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
# File 'lib/rdkafka/consumer.rb', line 865

def poll_batch_nb(timeout_ms = 0, max_items: 100)
  closed_consumer_check(__method__)

  buffer = batch_buffer(max_items)
  messages = []

  count = @native_kafka.with_inner do |_inner|
    Rdkafka::Bindings.rd_kafka_consume_batch_queue_nb(
      consumer_queue,
      timeout_ms,
      buffer,
      max_items
    )
  end

  return messages if count <= 0

  i = 0
  begin
    while i < count
      ptr = buffer.get_pointer(i * FFI::Pointer.size)

      if ptr.null?
        i += 1
        next
      end

      native_message = Rdkafka::Bindings::Message.new(ptr)

      if native_message[:err] != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
        raise Rdkafka::RdkafkaError.new(native_message[:err])
      end

      messages << Rdkafka::Consumer::Message.new(native_message)
      Rdkafka::Bindings.rd_kafka_message_destroy(ptr)
      i += 1
    end
  ensure
    while i < count
      ptr = buffer.get_pointer(i * FFI::Pointer.size)
      Rdkafka::Bindings.rd_kafka_message_destroy(ptr) unless ptr.null?
      i += 1
    end
  end

  messages
end

#poll_nb(timeout_ms = 0) ⇒ Message?

Poll for the next message without releasing the GVL (Global VM Lock).

This is more efficient than regular polling for non-blocking poll(0) calls, particularly useful in fiber scheduler contexts where GVL release/reacquire overhead is wasteful since we don’t expect to wait.

Examples:

Using with fiber scheduler

# After receiving IO notification that messages are available
while msg = consumer.poll_nb
  process(msg)
end

Parameters:

  • timeout_ms (Integer) (defaults to: 0)

    Timeout of this poll (default: 0 for non-blocking)

Returns:

  • (Message, nil)

    A message or nil if there was no new message within the timeout

Raises:



721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
# File 'lib/rdkafka/consumer.rb', line 721

def poll_nb(timeout_ms = 0)
  closed_consumer_check(__method__)

  message_ptr = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_consumer_poll_nb(inner, timeout_ms)
  end

  if message_ptr.null?
    nil
  else
    # Create struct wrapper
    native_message = Rdkafka::Bindings::Message.new(message_ptr)
    # Raise error if needed
    if native_message[:err] != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
      raise Rdkafka::RdkafkaError.new(native_message[:err])
    end
    # Create a message to pass out
    Rdkafka::Consumer::Message.new(native_message)
  end
ensure
  # Clean up rdkafka message if there is one
  if message_ptr && !message_ptr.null?
    Rdkafka::Bindings.rd_kafka_message_destroy(message_ptr)
  end
end

#poll_nb_each {|message| ... } ⇒ nil

Note:

This method uses ‘rd_kafka_consumer_poll` to fetch messages, unlike `events_poll_nb_each` which uses `rd_kafka_poll` for event callbacks (delivery reports, statistics, etc.). For consumers, use this method to receive messages and `events_poll_nb_each` for processing background events.

Note:

This method holds the inner lock for the duration. Other consumer operations will wait until this method returns.

Note:

Timeout/max_messages logic should be implemented by the caller

Polls for messages in a non-blocking loop, yielding each message to the caller.

This method processes messages in a single GVL/mutex session until the queue is empty or the caller returns ‘:stop`. It handles the message pointer lifecycle internally, ensuring proper cleanup via `rd_kafka_message_destroy`.

Examples:

Process messages until queue is empty

consumer.poll_nb_each do |message|
  process(message)
end

Process with early termination

count = 0
consumer.poll_nb_each do |message|
  process(message)
  count += 1
  :stop if count >= 10
end

Yields:

  • (message)

    Called for each message received

Yield Parameters:

Yield Returns:

  • (Symbol, Object)

    Return ‘:stop` to break the loop, any other value continues

Returns:

  • (nil)

Raises:



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/rdkafka/consumer.rb', line 154

def poll_nb_each
  closed_consumer_check(__method__)

  @native_kafka.with_inner do |inner|
    loop do
      message_ptr = Rdkafka::Bindings.rd_kafka_consumer_poll_nb(inner, 0)
      break if message_ptr.null?

      begin
        native_message = Rdkafka::Bindings::Message.new(message_ptr)

        if native_message[:err] != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
          raise Rdkafka::RdkafkaError.new(native_message[:err])
        end

        result = yield Consumer::Message.new(native_message)
        break if result == :stop
      ensure
        Rdkafka::Bindings.rd_kafka_message_destroy(message_ptr)
      end
    end
  end
end

#position(list = nil) ⇒ TopicPartitionList

Return the current positions (offsets) for topics and partitions. The offset field of each requested partition will be set to the offset of the last consumed message + 1, or nil in case there was no previous message.

Parameters:

  • list (TopicPartitionList, nil) (defaults to: nil)

    The topic with partitions to get the offsets for or nil to use the current subscription.

Returns:

Raises:



424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
# File 'lib/rdkafka/consumer.rb', line 424

def position(list = nil)
  if list.nil?
    list = assignment
  elsif !list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be nil or a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_position(inner, tpl)
  end

  if response != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
    raise Rdkafka::RdkafkaError.new(response)
  end

  TopicPartitionList.from_native_tpl(tpl)
ensure
  Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl
end

#query_watermark_offsets(topic, partition, timeout_ms = Defaults::CONSUMER_QUERY_WATERMARK_TIMEOUT_MS) ⇒ Integer

Query broker for low (oldest/beginning) and high (newest/end) offsets for a partition.

Parameters:

  • topic (String)

    The topic to query

  • partition (Integer)

    The partition to query

  • timeout_ms (Integer) (defaults to: Defaults::CONSUMER_QUERY_WATERMARK_TIMEOUT_MS)

    The timeout for querying the broker

Returns:

  • (Integer)

    The low and high watermark

Raises:



453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
# File 'lib/rdkafka/consumer.rb', line 453

def query_watermark_offsets(topic, partition, timeout_ms = Defaults::CONSUMER_QUERY_WATERMARK_TIMEOUT_MS)
  closed_consumer_check(__method__)

  low = FFI::MemoryPointer.new(:int64, 1)
  high = FFI::MemoryPointer.new(:int64, 1)

  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_query_watermark_offsets(
      inner,
      topic,
      partition,
      low,
      high,
      timeout_ms
    )
  end
  if response != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
    raise Rdkafka::RdkafkaError.new(response, "Error querying watermark offsets for partition #{partition} of #{topic}")
  end

  [low.read_array_of_int64(1).first, high.read_array_of_int64(1).first]
ensure
  low&.free
  high&.free
end

#resume(list) ⇒ nil

Resumes producing consumption for the provided list of partitions

Parameters:

Returns:

  • (nil)

Raises:



275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/rdkafka/consumer.rb', line 275

def resume(list)
  closed_consumer_check(__method__)

  unless list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end

  tpl = list.to_native_tpl

  begin
    response = @native_kafka.with_inner do |inner|
      Rdkafka::Bindings.rd_kafka_resume_partitions(inner, tpl)
    end
    if response != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
      raise Rdkafka::RdkafkaError.new(response, "Error resume '#{list.to_h}'")
    end
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
  end
end

#seek(message) ⇒ nil

Seek to a particular message. The next poll on the topic/partition will return the message at the given offset.

Parameters:

Returns:

  • (nil)

Raises:



570
571
572
# File 'lib/rdkafka/consumer.rb', line 570

def seek(message)
  seek_by(message.topic, message.partition, message.offset)
end

#seek_by(topic, partition, offset) ⇒ nil

Seek to a particular message by providing the topic, partition and offset. The next poll on the topic/partition will return the message at the given offset.

Parameters:

  • topic (String)

    The topic in which to seek

  • partition (Integer)

    The partition number to seek

  • offset (Integer)

    The partition offset to seek

Returns:

  • (nil)

Raises:



583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
# File 'lib/rdkafka/consumer.rb', line 583

def seek_by(topic, partition, offset)
  closed_consumer_check(__method__)

  # rd_kafka_offset_store is one of the few calls that does not support
  # a string as the topic, so create a native topic for it.
  native_topic = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_topic_new(
      inner,
      topic,
      nil
    )
  end
  response = Rdkafka::Bindings.rd_kafka_seek(
    native_topic,
    partition,
    offset,
    Defaults::CONSUMER_SEEK_TIMEOUT_MS
  )
  if response != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
    raise Rdkafka::RdkafkaError.new(response)
  end
ensure
  if native_topic && !native_topic.null?
    Rdkafka::Bindings.rd_kafka_topic_destroy(native_topic)
  end
end

#startObject

Note:

Not needed to run unless explicit start was disabled

Starts the native Kafka polling thread and kicks off the init polling



29
30
31
# File 'lib/rdkafka/consumer.rb', line 29

def start
  @native_kafka.start
end

#store_offset(message) ⇒ nil

Store offset of a message to be used in the next commit of this consumer

When using this ‘enable.auto.offset.store` should be set to `false` in the config.

Parameters:

Returns:

  • (nil)

Raises:



539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
# File 'lib/rdkafka/consumer.rb', line 539

def store_offset(message)
  closed_consumer_check(__method__)

  list = TopicPartitionList.new
  list.add_topic_and_partitions_with_offsets(
    message.topic,
    message.partition => message.offset + 1
  )

  tpl = list.to_native_tpl

  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_offsets_store(
      inner,
      tpl
    )
  end

  if response != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
    raise Rdkafka::RdkafkaError.new(response)
  end
ensure
  Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl
end

#subscribe(*topics) ⇒ nil

Subscribes to one or more topics letting Kafka handle partition assignments.

Parameters:

  • topics (Array<String>)

    One or more topic names

Returns:

  • (nil)

Raises:



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/rdkafka/consumer.rb', line 206

def subscribe(*topics)
  closed_consumer_check(__method__)

  # Create topic partition list with topics and no partition set
  tpl = Rdkafka::Bindings.rd_kafka_topic_partition_list_new(topics.length)

  topics.each do |topic|
    Rdkafka::Bindings.rd_kafka_topic_partition_list_add(tpl, topic, Rdkafka::Bindings::RD_KAFKA_PARTITION_UA)
  end

  # Subscribe to topic partition list and check this was successful
  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_subscribe(inner, tpl)
  end
  if response != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
    raise Rdkafka::RdkafkaError.new(response, "Error subscribing to '#{topics.join(", ")}'")
  end
ensure
  Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) unless tpl.nil?
end

#subscriptionTopicPartitionList

Returns the current subscription to topics and partitions

Returns:

Raises:



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# File 'lib/rdkafka/consumer.rb', line 300

def subscription
  closed_consumer_check(__method__)

  ptr = FFI::MemoryPointer.new(:pointer)
  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_subscription(inner, ptr)
  end

  if response != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
    raise Rdkafka::RdkafkaError.new(response)
  end

  native = ptr.read_pointer

  begin
    Rdkafka::Consumer::TopicPartitionList.from_native_tpl(native)
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(native)
  end
end

#unsubscribenil

Unsubscribe from all subscribed topics.

Returns:

  • (nil)

Raises:



231
232
233
234
235
236
237
238
239
240
# File 'lib/rdkafka/consumer.rb', line 231

def unsubscribe
  closed_consumer_check(__method__)

  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_unsubscribe(inner)
  end
  if response != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
    raise Rdkafka::RdkafkaError.new(response)
  end
end