Class: Karafka::Pro::Iterator

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/iterator.rb,
lib/karafka/pro/iterator/expander.rb,
lib/karafka/pro/iterator/tpl_builder.rb

Overview

Topic iterator allows you to iterate over topic/partition data and perform lookups for information that you need.

It supports early stops on finding the requested data and allows for seeking till the end. It also allows for signaling, when a given message should be last out of certain partition, but we still want to continue iterating in other messages.

It does not create a consumer group and does not have any offset management until first consumer offset marking happens. So can be use for quick seeks as well as iterative, repetitive data fetching from rake, etc.

Defined Under Namespace

Classes: Expander, TplBuilder

Instance Method Summary collapse

Constructor Details

#initialize(topics, settings: { "auto.offset.reset": "beginning" }, yield_nil: false, max_wait_time: 200) ⇒ Iterator

Note:

It is worth keeping in mind, that this API also needs to operate within ‘max.poll.interval.ms` limitations on each iteration

Note:

In case of a never-ending iterator, you need to set ‘enable.partition.eof` to `false` so we don’t stop polling data even when reaching the end (end on a given moment)

A simple API allowing to iterate over topic/partition data, without having to subscribe and deal with rebalances. This API allows for multi-partition streaming and is optimized for data lookups. It allows for explicit stopping iteration over any partition during the iteration process, allowing for optimized lookups.

Parameters:

  • topics (Array<String>, Hash)

    list of strings if we want to subscribe to multiple topics and all of their partitions or a hash where keys are the topics and values are hashes with partitions and their initial offsets.

  • settings (Hash) (defaults to: { "auto.offset.reset": "beginning" })

    extra settings for the consumer. Please keep in mind, that if overwritten, you may want to include ‘auto.offset.reset` to match your case.

  • yield_nil (Boolean) (defaults to: false)

    should we yield also ‘nil` values when poll returns nothing. Useful in particular for long-living iterators.

  • max_wait_time (Integer) (defaults to: 200)

    max wait in ms when iterator did not receive any messages



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/karafka/pro/iterator.rb', line 63

def initialize(
  topics,
  settings: { "auto.offset.reset": "beginning" },
  yield_nil: false,
  max_wait_time: 200
)
  @topics_with_partitions = Expander.new.call(topics)

  @routing_topics = @topics_with_partitions.to_h do |name, _|
    [name, Karafka::Routing::Router.find_or_initialize_by_name(name)]
  end

  @total_partitions = @topics_with_partitions.map(&:last).sum(&:count)

  @stopped_partitions = 0

  @settings = settings
  @yield_nil = yield_nil
  @max_wait_time = max_wait_time
end

Instance Method Details

#eachObject

Iterates over requested topic partitions and yields the results with the iterator itself Iterator instance is yielded because one can run ‘stop_partition` to stop iterating over part of data. It is useful for scenarios where we are looking for some information in all the partitions but once we found it, given partition data is no longer needed and would only eat up resources.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/karafka/pro/iterator.rb', line 89

def each
  ::Karafka::Admin.with_consumer(@settings) do |consumer|
    tpl = TplBuilder.new(consumer, @topics_with_partitions).call
    consumer.assign(tpl)

    # We need this for self-referenced APIs like pausing
    @current_consumer = consumer

    # Stream data until we reach the end of all the partitions or until the end user
    # indicates that they are done
    until done?
      message = poll

      # Skip nils if not explicitly required
      next if message.nil? && !@yield_nil

      if message
        @current_message = build_message(message)

        yield(@current_message, self)
      else
        yield(nil, self)
      end
    end

    @current_consumer.commit_offsets(async: false) if @stored_offsets
    @current_message = nil
    @current_consumer = nil
  end

  # Reset so we can use the same iterator again if needed
  @stopped_partitions = 0
end

#mark_as_consumed(message) ⇒ Object

Marks given message as consumed.

Parameters:



160
161
162
163
# File 'lib/karafka/pro/iterator.rb', line 160

def mark_as_consumed(message)
  @current_consumer.store_offset(message, nil)
  @stored_offsets = true
end

#mark_as_consumed!(message) ⇒ Object

Marks given message as consumed and commits offsets

Parameters:



168
169
170
171
# File 'lib/karafka/pro/iterator.rb', line 168

def mark_as_consumed!(message)
  mark_as_consumed(message)
  @current_consumer.commit_offsets(async: false)
end

#stopObject

Note:

‘break` can also be used but in such cases commits stored async will not be flushed to Kafka. This is why `#stop` is the recommended method.

Stops all the iterating



153
154
155
# File 'lib/karafka/pro/iterator.rb', line 153

def stop
  @stopped = true
end

#stop_current_partitionObject

Stops the partition we’re currently yielded into



124
125
126
127
128
129
# File 'lib/karafka/pro/iterator.rb', line 124

def stop_current_partition
  stop_partition(
    @current_message.topic,
    @current_message.partition
  )
end

#stop_partition(name, partition) ⇒ Object

Stops processing of a given partition We expect the partition to be provided because of a scenario, where there is a multi-partition iteration and we want to stop a different partition that the one that is currently yielded.

We pause it forever and no longer work with it.

Parameters:

  • name (String)

    topic name of which partition we want to stop

  • partition (Integer)

    partition we want to stop processing



140
141
142
143
144
145
146
147
148
# File 'lib/karafka/pro/iterator.rb', line 140

def stop_partition(name, partition)
  @stopped_partitions += 1

  @current_consumer.pause(
    Rdkafka::Consumer::TopicPartitionList.new(
      name => [Rdkafka::Consumer::Partition.new(partition, 0)]
    )
  )
end