Class: Karafka::Connection::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/connection/client.rb

Overview

An abstraction layer on top of the rdkafka consumer.

It is threadsafe and provides some security measures so we won’t end up operating on a closed consumer instance as it causes Ruby VM process to crash.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscription_group, batch_poll_breaker) ⇒ Karafka::Connection::Client

Creates a new consumer instance.

Parameters:

  • subscription_group (Karafka::Routing::SubscriptionGroup)

    subscription group with all the configuration details needed for us to create a client

  • batch_poll_breaker (Proc)

    proc that when evaluated to false will cause the batch poll loop to finish early. This improves the shutdown and dynamic multiplication as it allows us to early break on long polls.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/karafka/connection/client.rb', line 37

def initialize(subscription_group, batch_poll_breaker)
  @id = SecureRandom.hex(6)
  # Name is set when we build consumer
  @name = ''
  @closed = false
  @subscription_group = subscription_group
  @buffer = RawMessagesBuffer.new
  @tick_interval = ::Karafka::App.config.internal.tick_interval
  @rebalance_manager = RebalanceManager.new(@subscription_group.id)
  @rebalance_callback = Instrumentation::Callbacks::Rebalance.new(@subscription_group)

  @interval_runner = Helpers::IntervalRunner.new do
    events_poll
    # events poller returns nil when not running often enough, hence we don't use the
    # boolean to be explicit
    batch_poll_breaker.call ? :run : :stop
  end

  # There are few operations that can happen in parallel from the listener threads as well
  # as from the workers. They are not fully thread-safe because they may be composed out of
  # few calls to Kafka or out of few internal state changes. That is why we mutex them.
  # It mostly revolves around pausing and resuming.
  @mutex = Mutex.new
  # We need to keep track of what we have paused for resuming
  # In case we loose partition, we still need to resume it, otherwise it won't be fetched
  # again if we get reassigned to it later on. We need to keep them as after revocation we
  # no longer may be able to fetch them from Kafka. We could build them but it is easier
  # to just keep them here and use if needed when cannot be obtained
  @paused_tpls = Hash.new { |h, k| h[k] = {} }
end

Instance Attribute Details

#idString (readonly)

Returns id of the client.

Returns:

  • (String)

    id of the client



22
23
24
# File 'lib/karafka/connection/client.rb', line 22

def id
  @id
end

#nameString (readonly)

Note:

Consumer name may change in case we regenerate it

Returns underlying consumer name.

Returns:

  • (String)

    underlying consumer name



19
20
21
# File 'lib/karafka/connection/client.rb', line 19

def name
  @name
end

#rebalance_managerObject (readonly)

Returns the value of attribute rebalance_manager.



11
12
13
# File 'lib/karafka/connection/client.rb', line 11

def rebalance_manager
  @rebalance_manager
end

#subscription_groupKarafka::Routing::SubscriptionGroup (readonly)

Returns subscription group to which this client belongs to.

Returns:



15
16
17
# File 'lib/karafka/connection/client.rb', line 15

def subscription_group
  @subscription_group
end

Instance Method Details

#assignmentRdkafka::Consumer::TopicPartitionList

Returns current active assignment.

Returns:

  • (Rdkafka::Consumer::TopicPartitionList)

    current active assignment



141
142
143
# File 'lib/karafka/connection/client.rb', line 141

def assignment
  kafka.assignment
end

#assignment_lost?Boolean

Returns true if our current assignment has been lost involuntarily.

Returns:

  • (Boolean)

    true if our current assignment has been lost involuntarily.



136
137
138
# File 'lib/karafka/connection/client.rb', line 136

def assignment_lost?
  kafka.assignment_lost?
end

#batch_pollKarafka::Connection::MessagesBuffer

Note:

This method should not be executed from many threads at the same time

Fetches messages within boundaries defined by the settings (time, size, topics, etc).

Also periodically runs the events polling to trigger events callbacks.

Returns:



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/karafka/connection/client.rb', line 75

def batch_poll
  time_poll = TimeTrackers::Poll.new(@subscription_group.max_wait_time)

  @buffer.clear
  @rebalance_manager.clear

  events_poll

  loop do
    time_poll.start

    # Don't fetch more messages if we do not have any time left
    break if time_poll.exceeded?
    # Don't fetch more messages if we've fetched max that we've wanted
    break if @buffer.size >= @subscription_group.max_messages

    # Fetch message within our time boundaries
    response = poll(time_poll.remaining)

    # Put a message to the buffer if there is one
    @buffer << response if response && response != :tick_time

    # Upon polling rebalance manager might have been updated.
    # If partition revocation happens, we need to remove messages from revoked partitions
    # as well as ensure we do not have duplicated due to the offset reset for partitions
    # that we got assigned
    #
    # We also do early break, so the information about rebalance is used as soon as possible
    if @rebalance_manager.changed?
      # Since rebalances do not occur often, we can run events polling as well without
      # any throttling
      events_poll
      remove_revoked_and_duplicated_messages
      break
    end

    # If we were signaled from the outside to break the loop, we should
    break if @interval_runner.call == :stop

    # Track time spent on all of the processing and polling
    time_poll.checkpoint

    # Finally once we've (potentially) removed revoked, etc, if no messages were returned
    # and it was not an early poll exist, we can break.
    # Worth keeping in mind, that the rebalance manager might have been updated despite no
    # messages being returned during a poll
    break unless response
  end

  @buffer
end

#commit_offsets(async: true) ⇒ Boolean

Note:

This will commit all the offsets for the whole consumer. In order to achieve granular control over where the offset should be for particular topic partitions, the store_offset should be used to only store new offset when we want them to be flushed

Note:

This method for async may return ‘true` despite involuntary partition revocation as it does not resolve to `lost_assignment?`. It returns only the commit state operation result.

Commits the offset on a current consumer in a non-blocking or blocking way.

Parameters:

  • async (Boolean) (defaults to: true)

    should the commit happen async or sync (async by default)

Returns:

  • (Boolean)

    did committing was successful. It may be not, when we no longer own given partition.



158
159
160
# File 'lib/karafka/connection/client.rb', line 158

def commit_offsets(async: true)
  internal_commit_offsets(async: async)
end

#commit_offsets!Object

Commits offset in a synchronous way.

See Also:

  • for more details


165
166
167
# File 'lib/karafka/connection/client.rb', line 165

def commit_offsets!
  commit_offsets(async: false)
end

#committed(tpl = nil) ⇒ Rdkafka::Consumer::TopicPartitionList

Note:

It is recommended to use this only on rebalances to get positions with metadata when working with metadata as this is synchronous

Return the current committed offset per partition for this consumer group. The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition.

Parameters:

  • tpl (Rdkafka::Consumer::TopicPartitionList) (defaults to: nil)

    for which we want to get committed

Returns:

  • (Rdkafka::Consumer::TopicPartitionList)

Raises:

  • (Rdkafka::RdkafkaError)

    When getting the committed positions fails.



357
358
359
# File 'lib/karafka/connection/client.rb', line 357

def committed(tpl = nil)
  @wrapped_kafka.committed(tpl)
end

#consumer_group_metadata_pointerFFI::Pointer

Returns pointer to the consumer group metadata. It is used only in the context of exactly-once-semantics in transactions, this is why it is never remapped to Ruby

Returns:

  • (FFI::Pointer)


344
345
346
# File 'lib/karafka/connection/client.rb', line 344

def 
  kafka.
end

#events_poll(timeout = 0) ⇒ Object

Note:

It is non-blocking when timeout 0 and will not wait if queue empty. It costs up to 2ms when no callbacks are triggered.

Triggers the rdkafka main queue events by consuming this queue. This is not the consumer consumption queue but the one with:

- error callbacks
- stats callbacks
- OAUTHBEARER token refresh callbacks

Parameters:

  • timeout (Integer) (defaults to: 0)

    number of milliseconds to wait on events or 0 not to wait.



337
338
339
# File 'lib/karafka/connection/client.rb', line 337

def events_poll(timeout = 0)
  kafka.events_poll(timeout)
end

#mark_as_consumed(message, metadata = nil) ⇒ Boolean

Note:

This method won’t trigger automatic offsets commits, rather relying on the offset check-pointing trigger that happens with each batch processed. It will however check the ‘librdkafka` assignment ownership to increase accuracy for involuntary revocations.

Marks given message as consumed.

Parameters:

  • message (Karafka::Messages::Message)

    message that we want to mark as processed

  • metadata (String, nil) (defaults to: nil)

    offset storage metadata or nil if none

Returns:

  • (Boolean)

    true if successful. False if we no longer own given partition



285
286
287
# File 'lib/karafka/connection/client.rb', line 285

def mark_as_consumed(message,  = nil)
  store_offset(message, ) && !assignment_lost?
end

#mark_as_consumed!(message, metadata = nil) ⇒ Boolean

Marks a given message as consumed and commits the offsets in a blocking way.

Parameters:

  • message (Karafka::Messages::Message)

    message that we want to mark as processed

  • metadata (String, nil) (defaults to: nil)

    offset storage metadata or nil if none

Returns:

  • (Boolean)

    true if successful. False if we no longer own given partition



294
295
296
297
298
# File 'lib/karafka/connection/client.rb', line 294

def mark_as_consumed!(message,  = nil)
  return false unless mark_as_consumed(message, )

  commit_offsets!
end

#pause(topic, partition, offset = nil) ⇒ Object

Note:

This will pause indefinitely and requires manual ‘#resume`

Note:

When ‘#internal_seek` is not involved (when offset is `nil`) we will not purge the librdkafka buffers and continue from the last cursor offset

Pauses given partition and moves back to last successful offset processed.

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition

  • offset (Integer, nil) (defaults to: nil)

    offset of the message on which we want to pause (this message will be reprocessed after getting back to processing) or nil if we want to pause and resume from the consecutive offset (+1 from the last message passed to us by librdkafka)



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/karafka/connection/client.rb', line 189

def pause(topic, partition, offset = nil)
  @mutex.synchronize do
    # Do not pause if the client got closed, would not change anything
    return if @closed

    internal_commit_offsets(async: true)

    # Here we do not use our cached tpls because we should not try to pause something we do
    # not own anymore.
    tpl = topic_partition_list(topic, partition)

    return unless tpl

    Karafka.monitor.instrument(
      'client.pause',
      caller: self,
      subscription_group: @subscription_group,
      topic: topic,
      partition: partition,
      offset: offset
    )

    @paused_tpls[topic][partition] = tpl

    kafka.pause(tpl)

    # If offset is not provided, will pause where it finished.
    # This makes librdkafka not purge buffers and can provide significant network savings
    # when we just want to pause before further processing without changing the offsets
    return unless offset

    pause_msg = Messages::Seek.new(topic, partition, offset)

    internal_seek(pause_msg)
  end
end

#pingObject

Runs a single poll on the main queue and consumer queue ignoring all the potential errors This is used as a keep-alive in the shutdown stage and any errors that happen here are irrelevant from the shutdown process perspective

This is used only to trigger rebalance callbacks and other callbacks



320
321
322
323
324
325
# File 'lib/karafka/connection/client.rb', line 320

def ping
  events_poll(100)
  poll(100)
rescue Rdkafka::RdkafkaError
  nil
end

#resetObject

Closes and resets the client completely.



301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/karafka/connection/client.rb', line 301

def reset
  Karafka.monitor.instrument(
    'client.reset',
    caller: self,
    subscription_group: @subscription_group
  ) do
    close

    @interval_runner.reset
    @closed = false
    @paused_tpls.clear
  end
end

#resume(topic, partition) ⇒ Object

Resumes processing of a give topic partition after it was paused.

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition



230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/karafka/connection/client.rb', line 230

def resume(topic, partition)
  @mutex.synchronize do
    return if @closed

    # We now commit offsets on rebalances, thus we can do it async just to make sure
    internal_commit_offsets(async: true)

    # If we were not able, let's try to reuse the one we have (if we have)
    tpl = topic_partition_list(topic, partition) || @paused_tpls[topic][partition]

    return unless tpl

    # If we did not have it, it means we never paused this partition, thus no resume should
    # happen in the first place
    return unless @paused_tpls[topic].delete(partition)

    Karafka.monitor.instrument(
      'client.resume',
      caller: self,
      subscription_group: @subscription_group,
      topic: topic,
      partition: partition
    )

    kafka.resume(tpl)
  end
end

#seek(message) ⇒ Object

Note:

Please note, that if you are seeking to a time offset, getting the offset is blocking

Seek to a particular message. The next poll on the topic/partition will return the message at the given offset.

Parameters:



175
176
177
# File 'lib/karafka/connection/client.rb', line 175

def seek(message)
  @mutex.synchronize { internal_seek(message) }
end

#stopObject

Gracefully stops topic consumption.



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/karafka/connection/client.rb', line 259

def stop
  # In case of cooperative-sticky, there is a bug in librdkafka that may hang it.
  # To mitigate it we first need to unsubscribe so we will not receive any assignments and
  # only then we should be good to go.
  # @see https://github.com/confluentinc/librdkafka/issues/4527
  if @subscription_group.kafka[:'partition.assignment.strategy'] == 'cooperative-sticky'
    unsubscribe

    until assignment.empty?
      sleep(0.1)

      ping
    end
  end

  close
end

#store_offset(message, offset_metadata = nil) ⇒ Object

Stores offset for a given partition of a given topic based on the provided message.

Parameters:

  • message (Karafka::Messages::Message)
  • offset_metadata (String, nil) (defaults to: nil)

    offset storage metadata or nil if none



131
132
133
# File 'lib/karafka/connection/client.rb', line 131

def store_offset(message,  = nil)
  internal_store_offset(message, )
end