Class: Karafka::Pro::Iterator
- Inherits:
-
Object
- Object
- Karafka::Pro::Iterator
- Defined in:
- lib/karafka/pro/iterator.rb,
lib/karafka/pro/iterator/expander.rb,
lib/karafka/pro/iterator/tpl_builder.rb
Overview
Topic iterator allows you to iterate over topic/partition data and perform lookups for information that you need.
It supports early stops on finding the requested data and allows for seeking till the end. It also allows for signaling, when a given message should be last out of certain partition, but we still want to continue iterating in other messages.
It does not create a consumer group and does not have any offset management until first consumer offset marking happens. So can be use for quick seeks as well as iterative, repetitive data fetching from rake, etc.
Defined Under Namespace
Classes: Expander, TplBuilder
Instance Method Summary collapse
-
#each ⇒ Object
Iterates over requested topic partitions and yields the results with the iterator itself Iterator instance is yielded because one can run ‘stop_partition` to stop iterating over part of data.
-
#initialize(topics, settings: { "auto.offset.reset": "beginning" }, yield_nil: false, max_wait_time: 200) ⇒ Iterator
constructor
A simple API allowing to iterate over topic/partition data, without having to subscribe and deal with rebalances.
-
#mark_as_consumed(message) ⇒ Object
Marks given message as consumed.
-
#mark_as_consumed!(message) ⇒ Object
Marks given message as consumed and commits offsets.
-
#stop ⇒ Object
Stops all the iterating.
-
#stop_current_partition ⇒ Object
Stops the partition we’re currently yielded into.
-
#stop_partition(name, partition) ⇒ Object
Stops processing of a given partition We expect the partition to be provided because of a scenario, where there is a multi-partition iteration and we want to stop a different partition that the one that is currently yielded.
Constructor Details
#initialize(topics, settings: { "auto.offset.reset": "beginning" }, yield_nil: false, max_wait_time: 200) ⇒ Iterator
It is worth keeping in mind, that this API also needs to operate within ‘max.poll.interval.ms` limitations on each iteration
In case of a never-ending iterator, you need to set ‘enable.partition.eof` to `false` so we don’t stop polling data even when reaching the end (end on a given moment)
A simple API allowing to iterate over topic/partition data, without having to subscribe and deal with rebalances. This API allows for multi-partition streaming and is optimized for data lookups. It allows for explicit stopping iteration over any partition during the iteration process, allowing for optimized lookups.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/karafka/pro/iterator.rb', line 63 def initialize( topics, settings: { "auto.offset.reset": "beginning" }, yield_nil: false, max_wait_time: 200 ) @topics_with_partitions = Expander.new.call(topics) @routing_topics = @topics_with_partitions.to_h do |name, _| [name, Karafka::Routing::Router.find_or_initialize_by_name(name)] end @total_partitions = @topics_with_partitions.map(&:last).sum(&:count) @stopped_partitions = 0 @settings = settings @yield_nil = yield_nil @max_wait_time = max_wait_time end |
Instance Method Details
#each ⇒ Object
Iterates over requested topic partitions and yields the results with the iterator itself Iterator instance is yielded because one can run ‘stop_partition` to stop iterating over part of data. It is useful for scenarios where we are looking for some information in all the partitions but once we found it, given partition data is no longer needed and would only eat up resources.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/karafka/pro/iterator.rb', line 89 def each ::Karafka::Admin.with_consumer(@settings) do |consumer| tpl = TplBuilder.new(consumer, @topics_with_partitions).call consumer.assign(tpl) # We need this for self-referenced APIs like pausing @current_consumer = consumer # Stream data until we reach the end of all the partitions or until the end user # indicates that they are done until done? = poll # Skip nils if not explicitly required next if .nil? && !@yield_nil if @current_message = () yield(@current_message, self) else yield(nil, self) end end @current_consumer.commit_offsets(async: false) if @stored_offsets @current_message = nil @current_consumer = nil end # Reset so we can use the same iterator again if needed @stopped_partitions = 0 end |
#mark_as_consumed(message) ⇒ Object
Marks given message as consumed.
160 161 162 163 |
# File 'lib/karafka/pro/iterator.rb', line 160 def mark_as_consumed() @current_consumer.store_offset(, nil) @stored_offsets = true end |
#mark_as_consumed!(message) ⇒ Object
Marks given message as consumed and commits offsets
168 169 170 171 |
# File 'lib/karafka/pro/iterator.rb', line 168 def mark_as_consumed!() mark_as_consumed() @current_consumer.commit_offsets(async: false) end |
#stop ⇒ Object
‘break` can also be used but in such cases commits stored async will not be flushed to Kafka. This is why `#stop` is the recommended method.
Stops all the iterating
153 154 155 |
# File 'lib/karafka/pro/iterator.rb', line 153 def stop @stopped = true end |
#stop_current_partition ⇒ Object
Stops the partition we’re currently yielded into
124 125 126 127 128 129 |
# File 'lib/karafka/pro/iterator.rb', line 124 def stop_current_partition stop_partition( @current_message.topic, @current_message.partition ) end |
#stop_partition(name, partition) ⇒ Object
Stops processing of a given partition We expect the partition to be provided because of a scenario, where there is a multi-partition iteration and we want to stop a different partition that the one that is currently yielded.
We pause it forever and no longer work with it.
140 141 142 143 144 145 146 147 148 |
# File 'lib/karafka/pro/iterator.rb', line 140 def stop_partition(name, partition) @stopped_partitions += 1 @current_consumer.pause( Rdkafka::Consumer::TopicPartitionList.new( name => [Rdkafka::Consumer::Partition.new(partition, 0)] ) ) end |