Class: Karafka::Connection::Client
- Inherits:
-
Object
- Object
- Karafka::Connection::Client
- Includes:
- Karafka::Core::Helpers::Time
- Defined in:
- lib/karafka/connection/client.rb
Overview
An abstraction layer on top of the rdkafka consumer.
It is threadsafe and provides some security measures so we won’t end up operating on a closed consumer instance as it causes Ruby VM process to crash.
Instance Attribute Summary collapse
-
#id ⇒ String
readonly
Id of the client.
-
#name ⇒ String
readonly
Underlying consumer name.
-
#rebalance_manager ⇒ Object
readonly
Returns the value of attribute rebalance_manager.
-
#subscription_group ⇒ Karafka::Routing::SubscriptionGroup
readonly
Subscription group to which this client belongs to.
Instance Method Summary collapse
-
#assignment ⇒ Rdkafka::Consumer::TopicPartitionList
Current active assignment.
-
#assignment_lost? ⇒ Boolean
True if our current assignment has been lost involuntarily.
-
#batch_poll ⇒ Karafka::Connection::MessagesBuffer
Fetches messages within boundaries defined by the settings (time, size, topics, etc).
-
#commit_offsets(async: true) ⇒ Boolean
Commits the offset on a current consumer in a non-blocking or blocking way.
-
#commit_offsets! ⇒ Object
Commits offset in a synchronous way.
-
#committed(tpl = nil) ⇒ Rdkafka::Consumer::TopicPartitionList
Return the current committed offset per partition for this consumer group.
-
#consumer_group_metadata_pointer ⇒ FFI::Pointer
Returns pointer to the consumer group metadata.
-
#events_poll(timeout = 0) ⇒ Object
Triggers the rdkafka main queue events by consuming this queue.
-
#initialize(subscription_group, batch_poll_breaker) ⇒ Karafka::Connection::Client
constructor
Creates a new consumer instance.
-
#mark_as_consumed(message, metadata = nil) ⇒ Boolean
Marks given message as consumed.
-
#mark_as_consumed!(message, metadata = nil) ⇒ Boolean
Marks a given message as consumed and commits the offsets in a blocking way.
-
#pause(topic, partition, offset = nil) ⇒ Object
Pauses given partition and moves back to last successful offset processed.
-
#ping ⇒ Object
Runs a single poll on the main queue and consumer queue ignoring all the potential errors This is used as a keep-alive in the shutdown stage and any errors that happen here are irrelevant from the shutdown process perspective.
-
#reset ⇒ Object
Closes and resets the client completely.
-
#resume(topic, partition) ⇒ Object
Resumes processing of a give topic partition after it was paused.
-
#seek(message) ⇒ Object
Seek to a particular message.
-
#stop ⇒ Object
Gracefully stops topic consumption.
-
#store_offset(message, offset_metadata = nil) ⇒ Object
Stores offset for a given partition of a given topic based on the provided message.
Constructor Details
#initialize(subscription_group, batch_poll_breaker) ⇒ Karafka::Connection::Client
Creates a new consumer instance.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/karafka/connection/client.rb', line 44 def initialize(subscription_group, batch_poll_breaker) @id = SecureRandom.hex(6) # Name is set when we build consumer @name = '' @closed = false @subscription_group = subscription_group @buffer = RawMessagesBuffer.new @tick_interval = ::Karafka::App.config.internal.tick_interval @rebalance_manager = RebalanceManager.new(@subscription_group.id) @rebalance_callback = Instrumentation::Callbacks::Rebalance.new(@subscription_group) @interval_runner = Helpers::IntervalRunner.new do events_poll # events poller returns nil when not running often enough, hence we don't use the # boolean to be explicit batch_poll_breaker.call ? :run : :stop end # There are few operations that can happen in parallel from the listener threads as well # as from the workers. They are not fully thread-safe because they may be composed out of # few calls to Kafka or out of few internal state changes. That is why we mutex them. # It mostly revolves around pausing and resuming. @mutex = Mutex.new # We need to keep track of what we have paused for resuming # In case we loose partition, we still need to resume it, otherwise it won't be fetched # again if we get reassigned to it later on. We need to keep them as after revocation we # no longer may be able to fetch them from Kafka. We could build them but it is easier # to just keep them here and use if needed when cannot be obtained @paused_tpls = Hash.new { |h, k| h[k] = {} } end |
Instance Attribute Details
#id ⇒ String (readonly)
Returns id of the client.
24 25 26 |
# File 'lib/karafka/connection/client.rb', line 24 def id @id end |
#name ⇒ String (readonly)
Consumer name may change in case we regenerate it
Returns underlying consumer name.
21 22 23 |
# File 'lib/karafka/connection/client.rb', line 21 def name @name end |
#rebalance_manager ⇒ Object (readonly)
Returns the value of attribute rebalance_manager.
13 14 15 |
# File 'lib/karafka/connection/client.rb', line 13 def rebalance_manager @rebalance_manager end |
#subscription_group ⇒ Karafka::Routing::SubscriptionGroup (readonly)
Returns subscription group to which this client belongs to.
17 18 19 |
# File 'lib/karafka/connection/client.rb', line 17 def subscription_group @subscription_group end |
Instance Method Details
#assignment ⇒ Rdkafka::Consumer::TopicPartitionList
Returns current active assignment.
148 149 150 |
# File 'lib/karafka/connection/client.rb', line 148 def assignment kafka.assignment end |
#assignment_lost? ⇒ Boolean
Returns true if our current assignment has been lost involuntarily.
143 144 145 |
# File 'lib/karafka/connection/client.rb', line 143 def assignment_lost? kafka.assignment_lost? end |
#batch_poll ⇒ Karafka::Connection::MessagesBuffer
This method should not be executed from many threads at the same time
Fetches messages within boundaries defined by the settings (time, size, topics, etc).
Also periodically runs the events polling to trigger events callbacks.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/karafka/connection/client.rb', line 82 def batch_poll time_poll = TimeTrackers::Poll.new(@subscription_group.max_wait_time) @buffer.clear @rebalance_manager.clear events_poll loop do time_poll.start # Don't fetch more messages if we do not have any time left break if time_poll.exceeded? # Don't fetch more messages if we've fetched max that we've wanted break if @buffer.size >= @subscription_group. # Fetch message within our time boundaries response = poll(time_poll.remaining) # Put a message to the buffer if there is one @buffer << response if response && response != :tick_time # Upon polling rebalance manager might have been updated. # If partition revocation happens, we need to remove messages from revoked partitions # as well as ensure we do not have duplicated due to the offset reset for partitions # that we got assigned # # We also do early break, so the information about rebalance is used as soon as possible if @rebalance_manager.changed? # Since rebalances do not occur often, we can run events polling as well without # any throttling events_poll break end # If we were signaled from the outside to break the loop, we should break if @interval_runner.call == :stop # Track time spent on all of the processing and polling time_poll.checkpoint # Finally once we've (potentially) removed revoked, etc, if no messages were returned # and it was not an early poll exist, we can break. # Worth keeping in mind, that the rebalance manager might have been updated despite no # messages being returned during a poll break unless response end @buffer end |
#commit_offsets(async: true) ⇒ Boolean
This will commit all the offsets for the whole consumer. In order to achieve granular control over where the offset should be for particular topic partitions, the store_offset should be used to only store new offset when we want them to be flushed
This method for async may return ‘true` despite involuntary partition revocation as it does not resolve to `lost_assignment?`. It returns only the commit state operation result.
Commits the offset on a current consumer in a non-blocking or blocking way.
165 166 167 |
# File 'lib/karafka/connection/client.rb', line 165 def commit_offsets(async: true) internal_commit_offsets(async: async) end |
#commit_offsets! ⇒ Object
Commits offset in a synchronous way.
172 173 174 |
# File 'lib/karafka/connection/client.rb', line 172 def commit_offsets! commit_offsets(async: false) end |
#committed(tpl = nil) ⇒ Rdkafka::Consumer::TopicPartitionList
It is recommended to use this only on rebalances to get positions with metadata when working with metadata as this is synchronous
Return the current committed offset per partition for this consumer group. The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition.
386 387 388 |
# File 'lib/karafka/connection/client.rb', line 386 def committed(tpl = nil) @wrapped_kafka.committed(tpl) end |
#consumer_group_metadata_pointer ⇒ FFI::Pointer
Returns pointer to the consumer group metadata. It is used only in the context of exactly-once-semantics in transactions, this is why it is never remapped to Ruby
373 374 375 |
# File 'lib/karafka/connection/client.rb', line 373 def kafka. end |
#events_poll(timeout = 0) ⇒ Object
It is non-blocking when timeout 0 and will not wait if queue empty. It costs up to 2ms when no callbacks are triggered.
Triggers the rdkafka main queue events by consuming this queue. This is not the consumer consumption queue but the one with:
- error callbacks
- stats callbacks
- OAUTHBEARER token refresh callbacks
366 367 368 |
# File 'lib/karafka/connection/client.rb', line 366 def events_poll(timeout = 0) kafka.events_poll(timeout) end |
#mark_as_consumed(message, metadata = nil) ⇒ Boolean
This method won’t trigger automatic offsets commits, rather relying on the offset check-pointing trigger that happens with each batch processed. It will however check the ‘librdkafka` assignment ownership to increase accuracy for involuntary revocations.
Marks given message as consumed.
314 315 316 |
# File 'lib/karafka/connection/client.rb', line 314 def mark_as_consumed(, = nil) store_offset(, ) && !assignment_lost? end |
#mark_as_consumed!(message, metadata = nil) ⇒ Boolean
Marks a given message as consumed and commits the offsets in a blocking way.
323 324 325 326 327 |
# File 'lib/karafka/connection/client.rb', line 323 def mark_as_consumed!(, = nil) return false unless mark_as_consumed(, ) commit_offsets! end |
#pause(topic, partition, offset = nil) ⇒ Object
This will pause indefinitely and requires manual ‘#resume`
When ‘#internal_seek` is not involved (when offset is `nil`) we will not purge the librdkafka buffers and continue from the last cursor offset
Pauses given partition and moves back to last successful offset processed.
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/karafka/connection/client.rb', line 196 def pause(topic, partition, offset = nil) @mutex.synchronize do # Do not pause if the client got closed, would not change anything return if @closed internal_commit_offsets(async: true) # Here we do not use our cached tpls because we should not try to pause something we do # not own anymore. tpl = topic_partition_list(topic, partition) return unless tpl Karafka.monitor.instrument( 'client.pause', caller: self, subscription_group: @subscription_group, topic: topic, partition: partition, offset: offset ) @paused_tpls[topic][partition] = tpl kafka.pause(tpl) # If offset is not provided, will pause where it finished. # This makes librdkafka not purge buffers and can provide significant network savings # when we just want to pause before further processing without changing the offsets return unless offset pause_msg = Messages::Seek.new(topic, partition, offset) internal_seek(pause_msg) end end |
#ping ⇒ Object
Runs a single poll on the main queue and consumer queue ignoring all the potential errors This is used as a keep-alive in the shutdown stage and any errors that happen here are irrelevant from the shutdown process perspective
This is used only to trigger rebalance callbacks and other callbacks
349 350 351 352 353 354 |
# File 'lib/karafka/connection/client.rb', line 349 def ping events_poll(100) poll(100) rescue Rdkafka::RdkafkaError nil end |
#reset ⇒ Object
Closes and resets the client completely.
330 331 332 333 334 335 336 337 338 339 340 341 342 |
# File 'lib/karafka/connection/client.rb', line 330 def reset Karafka.monitor.instrument( 'client.reset', caller: self, subscription_group: @subscription_group ) do close @interval_runner.reset @closed = false @paused_tpls.clear end end |
#resume(topic, partition) ⇒ Object
Resumes processing of a give topic partition after it was paused.
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/karafka/connection/client.rb', line 237 def resume(topic, partition) @mutex.synchronize do return if @closed # We now commit offsets on rebalances, thus we can do it async just to make sure internal_commit_offsets(async: true) # If we were not able, let's try to reuse the one we have (if we have) tpl = topic_partition_list(topic, partition) || @paused_tpls[topic][partition] return unless tpl # If we did not have it, it means we never paused this partition, thus no resume should # happen in the first place return unless @paused_tpls[topic].delete(partition) Karafka.monitor.instrument( 'client.resume', caller: self, subscription_group: @subscription_group, topic: topic, partition: partition ) kafka.resume(tpl) end end |
#seek(message) ⇒ Object
Please note, that if you are seeking to a time offset, getting the offset is blocking
Seek to a particular message. The next poll on the topic/partition will return the message at the given offset.
182 183 184 |
# File 'lib/karafka/connection/client.rb', line 182 def seek() @mutex.synchronize { internal_seek() } end |
#stop ⇒ Object
Gracefully stops topic consumption.
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/karafka/connection/client.rb', line 266 def stop # librdkafka has several constant issues when shutting down during rebalance. This is # an issue that gets back every few versions of librdkafka in a limited scope, for example # for cooperative-sticky or in a general scope. This is why we unsubscribe and wait until # we no longer have any assignments. That way librdkafka consumer shutdown should never # happen with rebalance associated with the given consumer instance. Since we do not want # to wait forever, we also impose a limit on how long should we wait. This prioritizes # shutdown stability over endless wait. # # The `@unsubscribing` ensures that when there would be a direct close attempt, it # won't get into this loop again. This can happen when supervision decides it should close # things faster # # @see https://github.com/confluentinc/librdkafka/issues/4792 # @see https://github.com/confluentinc/librdkafka/issues/4527 if unsubscribe? @unsubscribing = true # Give 50% of time for the final close before we reach the forceful max_wait = ::Karafka::App.config.shutdown_timeout * COOP_UNSUBSCRIBE_FACTOR used = 0 stopped_at = monotonic_now unsubscribe until assignment.empty? used += monotonic_now - stopped_at stopped_at = monotonic_now break if used >= max_wait sleep(0.1) ping end end close end |
#store_offset(message, offset_metadata = nil) ⇒ Object
Stores offset for a given partition of a given topic based on the provided message.
138 139 140 |
# File 'lib/karafka/connection/client.rb', line 138 def store_offset(, = nil) internal_store_offset(, ) end |