Class: Karafka::Admin::Topics

Inherits:
Karafka::Admin show all
Defined in:
lib/karafka/admin/topics.rb

Overview

Topic administration operations Provides methods to manage Kafka topics including creation, deletion, reading, and introspection

Constant Summary

Constants inherited from Karafka::Admin

Recovery

Instance Attribute Summary

Attributes inherited from Karafka::Admin

#custom_kafka

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Karafka::Admin

#close, cluster_info, #cluster_info, copy_consumer_group, #copy_consumer_group, create_topic, #create_topic, delete_consumer_group, #delete_consumer_group, delete_topic, #delete_topic, #initialize, plan_topic_replication, #plan_topic_replication, read_lags_with_offsets, #read_lags_with_offsets, read_topic, #read_topic, rename_consumer_group, #rename_consumer_group, seek_consumer_group, #seek_consumer_group, topic_info, #topic_info, trigger_rebalance, #trigger_rebalance, with_admin, #with_admin, with_consumer, #with_consumer

Constructor Details

This class inherits a constructor from Karafka::Admin

Class Method Details

.create(name, partitions, replication_factor, topic_config = {}) ⇒ Object

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    number of partitions for this topic

  • replication_factor (Integer)

    number of replicas

  • topic_config (Hash) (defaults to: {})

    topic config details as described in the ‘base topic configuration`))

See Also:



26
27
28
# File 'lib/karafka/admin/topics.rb', line 26

def create(name, partitions, replication_factor, topic_config = {})
  new.create(name, partitions, replication_factor, topic_config)
end

.create_partitions(name, partitions) ⇒ Object

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    total number of partitions we expect to end up with

See Also:



39
40
41
# File 'lib/karafka/admin/topics.rb', line 39

def create_partitions(name, partitions)
  new.create_partitions(name, partitions)
end

.delete(name) ⇒ Object

Parameters:

  • name (String)

    topic name

See Also:



32
33
34
# File 'lib/karafka/admin/topics.rb', line 32

def delete(name)
  new.delete(name)
end

.info(topic_name) ⇒ Object

Parameters:

  • topic_name (String)

    name of the topic we’re interested in

See Also:



52
53
54
# File 'lib/karafka/admin/topics.rb', line 52

def info(topic_name)
  new.info(topic_name)
end

.read(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

  • count (Integer)

    how many messages we want to get at most

  • start_offset (Integer, Time) (defaults to: -1,)

    offset from which we should start

  • settings (Hash) (defaults to: {})

    kafka extra settings (optional)

See Also:



16
17
18
# File 'lib/karafka/admin/topics.rb', line 16

def read(name, partition, count, start_offset = -1, settings = {})
  new.read(name, partition, count, start_offset, settings)
end

.read_partition_offsets(topic_partition_offsets, isolation_level: nil) ⇒ Object

Parameters:

  • topic_partition_offsets (Hash{String => Array<Hash>})

    topics with partition specs

  • isolation_level (Integer, nil) (defaults to: nil)

    optional isolation level constant

See Also:



59
60
61
# File 'lib/karafka/admin/topics.rb', line 59

def read_partition_offsets(topic_partition_offsets, isolation_level: nil)
  new.read_partition_offsets(topic_partition_offsets, isolation_level: isolation_level)
end

.read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object

Parameters:

  • name_or_hash (String, Symbol, Hash)

    topic name or hash with topics and partitions

  • partition (Integer, nil) (defaults to: nil)

    partition (nil when using hash format)

See Also:



46
47
48
# File 'lib/karafka/admin/topics.rb', line 46

def read_watermark_offsets(name_or_hash, partition = nil)
  new.read_watermark_offsets(name_or_hash, partition)
end

Instance Method Details

#create(name, partitions, replication_factor, topic_config = {}) ⇒ void

This method returns an undefined value.

Creates Kafka topic with given settings

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    number of partitions we expect

  • replication_factor (Integer)

    number of replicas

  • topic_config (Hash) (defaults to: {})

    topic config details as described here: kafka.apache.org/documentation/#topicconfigs



152
153
154
155
156
157
158
159
160
161
# File 'lib/karafka/admin/topics.rb', line 152

def create(name, partitions, replication_factor, topic_config = {})
  with_admin do |admin|
    handler = admin.create_topic(name, partitions, replication_factor, topic_config)

    with_re_wait(
      -> { handler.wait(max_wait_timeout_ms: max_wait_time_ms) },
      -> { names.include?(name) }
    )
  end
end

#create_partitions(name, partitions) ⇒ void

This method returns an undefined value.

Creates more partitions for a given topic

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    total number of partitions we expect to end up with



185
186
187
188
189
190
191
192
193
194
# File 'lib/karafka/admin/topics.rb', line 185

def create_partitions(name, partitions)
  with_admin do |admin|
    handler = admin.create_partitions(name, partitions)

    with_re_wait(
      -> { handler.wait(max_wait_timeout_ms: max_wait_time_ms) },
      -> { info(name).fetch(:partition_count) >= partitions }
    )
  end
end

#delete(name) ⇒ void

This method returns an undefined value.

Deleted a given topic

Parameters:

  • name (String)

    topic name



168
169
170
171
172
173
174
175
176
177
# File 'lib/karafka/admin/topics.rb', line 168

def delete(name)
  with_admin do |admin|
    handler = admin.delete_topic(name)

    with_re_wait(
      -> { handler.wait(max_wait_timeout_ms: max_wait_time_ms) },
      -> { !names.include?(name) }
    )
  end
end

#info(topic_name) ⇒ Hash

Note:

This query is much more efficient than doing a full ‘#cluster_info` + topic lookup because it does not have to query for all the topics data but just the topic we’re interested in

Returns basic topic metadata

Parameters:

  • topic_name (String)

    name of the topic we’re interested in

Returns:

  • (Hash)

    topic metadata info hash

Raises:

  • (Rdkafka::RdkafkaError)

    ‘unknown_topic_or_part` if requested topic is not found



338
339
340
341
342
343
344
345
# File 'lib/karafka/admin/topics.rb', line 338

def info(topic_name)
  with_admin do |admin|
    admin
      .(topic_name)
      .topics
      .find { |topic| topic[:topic_name] == topic_name }
  end
end

#read(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>

Allows us to read messages from the topic

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

  • count (Integer)

    how many messages we want to get at most

  • start_offset (Integer, Time) (defaults to: -1,)

    offset from which we should start. If -1 is provided (default) we will start from the latest offset. If time is provided, the appropriate offset will be resolved. If negative beyond -1 is provided, we move backwards more.

  • settings (Hash) (defaults to: {})

    kafka extra settings (optional)

Returns:



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/karafka/admin/topics.rb', line 75

def read(name, partition, count, start_offset = -1, settings = {})
  messages = []
  tpl = Rdkafka::Consumer::TopicPartitionList.new
  low_offset, high_offset = nil

  with_consumer(settings) do |consumer|
    # Convert the time offset (if needed)
    start_offset = resolve_offset(consumer, name.to_s, partition, start_offset)

    low_offset, high_offset = consumer.query_watermark_offsets(name, partition)

    # Select offset dynamically if -1 or less and move backwards with the negative
    # offset, allowing to start from N messages back from high-watermark
    start_offset = high_offset - count - start_offset.abs + 1 if start_offset.negative?
    start_offset = low_offset if start_offset.negative?

    # Build the requested range - since first element is on the start offset we need to
    # subtract one from requested count to end up with expected number of elements
    requested_range = (start_offset..(start_offset + count - 1))
    # Establish theoretical available range. Note, that this does not handle cases related
    # to log retention or compaction
    available_range = (low_offset..(high_offset - 1))
    # Select only offset that we can select. This will remove all the potential offsets
    # that are below the low watermark offset
    possible_range = requested_range.select { |offset| available_range.include?(offset) }

    start_offset = possible_range.first
    count = possible_range.size

    tpl.add_topic_and_partitions_with_offsets(name, partition => start_offset)
    consumer.assign(tpl)

    # We should poll as long as we don't have all the messages that we need or as long as
    # we do not read all the messages from the topic
    loop do
      # If we've got as many messages as we've wanted stop
      break if messages.size >= count

      message = consumer.poll(200)

      next unless message

      # If the message we've got is beyond the requested range, stop
      break unless possible_range.include?(message.offset)

      messages << message
    rescue Rdkafka::RdkafkaError => e
      # End of partition
      break if e.code == :partition_eof

      raise e
    end
  end

  # Use topic from routes if we can match it or create a dummy one
  # Dummy one is used in case we cannot match the topic with routes. This can happen
  # when admin API is used to read topics that are not part of the routing
  topic = Karafka::Routing::Router.find_or_initialize_by_name(name)

  messages.map! do |message|
    Messages::Builders::Message.call(
      message,
      topic,
      Time.now
    )
  end
end

#read_partition_offsets(topic_partition_offsets, isolation_level: nil) ⇒ Array<Hash>

Queries partition offsets by specification using the Kafka ListOffsets admin API.

This is a lower-level, more flexible alternative to ‘#read_watermark_offsets`. The key differences are:

  • Transactional correctness: ‘read_watermark_offsets` always returns the high-watermark offset, which includes messages from uncommitted or in-flight transactions. A `READ_COMMITTED` consumer will never see those messages, so lag calculated from the high-watermark is overstated on transactionally-produced topics. Passing `isolation_level: Karafka::Admin::IsolationLevels::READ_COMMITTED` here returns the Last Stable Offset (LSO) — the highest offset a `READ_COMMITTED` consumer would actually reach — giving accurate lag figures.

  • ‘:max_timestamp` spec: returns the offset of the message with the highest timestamp in the partition. Not available via watermarks.

  • Leader epoch: the response includes ‘leader_epoch`, which can be used to detect stale metadata or fencing conditions.

  • Admin API: operates through the admin client rather than a consumer connection.

For non-transactional topics, ‘:latest` here and `read_watermark_offsets` high-watermark return the same value.

Examples:

Query earliest offset for partition 0 and latest for partition 1

Karafka::Admin::Topics.read_partition_offsets(
  'events' => [
    { partition: 0, offset: :earliest },
    { partition: 1, offset: :latest }
  ]
)
# => [
#   { topic: 'events', partition: 0, offset: 0, timestamp: -1, leader_epoch: nil },
#   { topic: 'events', partition: 1, offset: 100, timestamp: -1, leader_epoch: nil }
# ]

Get LSO (Last Stable Offset) for accurate lag on transactional topics

Karafka::Admin::Topics.read_partition_offsets(
  'events' => [{ partition: 0, offset: :latest }],
  isolation_level: Karafka::Admin::IsolationLevels::READ_COMMITTED
)

Find offset at a specific point in time

Karafka::Admin::Topics.read_partition_offsets(
  'events' => [{ partition: 0, offset: 1_700_000_000_000 }]
)

Parameters:

  • topic_partition_offsets (Hash{String => Array<Hash>})

    hash with topic names as keys and arrays of partition specs as values. Each spec must have:

    • ‘:partition` [Integer] the partition number

    • ‘:offset` [Symbol, Integer] `:earliest`, `:latest`, `:max_timestamp`, or an Integer timestamp in milliseconds (returns the first offset at or after that timestamp)

    Each partition may appear at most once per call.

  • isolation_level (Integer, nil) (defaults to: nil)

    optional isolation level constant. Pass ‘Karafka::Admin::IsolationLevels::READ_COMMITTED` to get the LSO instead of the high-watermark for `:latest` queries on transactionally-produced topics.

Returns:

  • (Array<Hash>)

    array of result hashes, each containing:

    • ‘:topic` [String]

    • ‘:partition` [Integer]

    • ‘:offset` [Integer]

    • ‘:timestamp` [Integer] (-1 when not applicable)

    • ‘:leader_epoch` [Integer, nil]

Raises:

  • (Rdkafka::RdkafkaError)

    on per-partition errors or connection issues



322
323
324
325
326
327
# File 'lib/karafka/admin/topics.rb', line 322

def read_partition_offsets(topic_partition_offsets, isolation_level: nil)
  with_admin do |admin|
    handle = admin.list_offsets(topic_partition_offsets, isolation_level: isolation_level)
    handle.wait(max_wait_timeout_ms: max_wait_time_ms).offsets
  end
end

#read_watermark_offsets(name_or_hash, partition = nil) ⇒ Array<Integer, Integer>, Hash

Fetches the watermark offsets for a given topic partition or multiple topics and partitions

Examples:

Query single partition

Karafka::Admin::Topics.read_watermark_offsets('events', 0)
# => [0, 100]

Query specific partitions across multiple topics

Karafka::Admin::Topics.read_watermark_offsets(
  { 'events' => [0, 1], 'logs' => [0] }
)
# => {
#   'events' => {
#     0 => [0, 100],
#     1 => [0, 150]
#   },
#   'logs' => {
#     0 => [0, 50]
#   }
# }

Parameters:

  • name_or_hash (String, Symbol, Hash)

    topic name or hash with topics and partitions

  • partition (Integer, nil) (defaults to: nil)

    partition number (required when first param is topic name)

Returns:

  • (Array<Integer, Integer>, Hash)

    when querying single partition returns array with low and high watermark offsets, when querying multiple returns nested hash



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/karafka/admin/topics.rb', line 222

def read_watermark_offsets(name_or_hash, partition = nil)
  # Normalize input to hash format
  topics_with_partitions = partition ? { name_or_hash => [partition] } : name_or_hash

  low_specs = {}
  high_specs = {}

  topics_with_partitions.each do |topic, partitions|
    low_specs[topic] = partitions.map { |p| { partition: p, offset: :earliest } }
    high_specs[topic] = partitions.map { |p| { partition: p, offset: :latest } }
  end

  lows = {}
  highs = {}

  with_admin do |admin|
    admin.list_offsets(low_specs).wait(max_wait_timeout_ms: max_wait_time_ms).offsets.each do |r|
      (lows[r[:topic]] ||= {})[r[:partition]] = r[:offset]
    end
    admin.list_offsets(high_specs).wait(max_wait_timeout_ms: max_wait_time_ms).offsets.each do |r|
      (highs[r[:topic]] ||= {})[r[:partition]] = r[:offset]
    end
  end

  result = Hash.new { |h, k| h[k] = {} }
  topics_with_partitions.each do |topic, partitions|
    partitions.each do |partition_id|
      result[topic][partition_id] = [lows.dig(topic.to_s, partition_id), highs.dig(topic.to_s, partition_id)]
    end
  end

  # Return single array for single partition query, hash for multiple
  partition ? result.dig(name_or_hash, partition) : result
end