Class: Rdkafka::Config

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/config.rb

Overview

Configuration for a Kafka consumer or producer. You can create an instance and use the consumer and producer methods to create a client. Documentation of the available configuration options is available on github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md.

Defined Under Namespace

Classes: ClientCreationError, ConfigError, NoLoggerError

Constant Summary collapse

DEFAULT_CONFIG =

Default config that can be overwritten.

{}.freeze
REQUIRED_CONFIG =

Required config that cannot be overwritten.

{
  # Enable log queues so we get callbacks in our own Ruby threads
  "log.queue": true
}.freeze
@@logger =
Logger.new($stdout)
@@statistics_callback =
nil
@@error_callback =
nil
@@opaques =
ObjectSpace::WeakMap.new
@@log_queue =
Queue.new
@@log_thread =

We memoize thread on the first log flush This allows us also to restart logger thread on forks

nil
@@log_mutex =
Mutex.new
@@oauthbearer_token_refresh_callback =
nil

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config_hash = {}) ⇒ Config

Returns a new config with the provided options which are merged with DEFAULT_CONFIG.

Parameters:

  • config_hash (Hash{String,Symbol => String}) (defaults to: {})

    The config options for rdkafka



142
143
144
145
146
147
148
# File 'lib/rdkafka/config.rb', line 142

def initialize(config_hash = {})
  Callbacks.ensure_ffi_running

  @config_hash = DEFAULT_CONFIG.merge(config_hash)
  @consumer_rebalance_listener = nil
  @consumer_poll_set = true
end

Instance Attribute Details

#consumer_poll_set=(value) ⇒ Boolean (writeonly)

Should we use a single queue for the underlying consumer and events.

This is an advanced API that allows for more granular control of the polling process. When this value is set to ‘false` (`true` by defualt), there will be two queues that need to be polled:

- main librdkafka queue for events
- consumer queue with messages and rebalances

It is recommended to use the defaults and only set it to ‘false` in advance multi-threaded and complex cases where granular events handling control is needed.

Returns:

  • (Boolean)


186
187
188
# File 'lib/rdkafka/config.rb', line 186

def consumer_poll_set=(value)
  @consumer_poll_set = value
end

#consumer_rebalance_listener=(value) ⇒ Object, ... (writeonly)

Get notifications on partition assignment/revocation for the subscribed topics

Returns:

  • (Object, #on_partitions_assigned, #on_partitions_revoked)

    listener instance



172
173
174
# File 'lib/rdkafka/config.rb', line 172

def consumer_rebalance_listener=(value)
  @consumer_rebalance_listener = value
end

Class Method Details

.ensure_log_threadObject

Makes sure that there is a thread for consuming logs We do not spawn thread immediately and we need to check if it operates to support forking



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/rdkafka/config.rb', line 35

def self.ensure_log_thread
  return if @@log_thread&.alive?

  @@log_mutex.synchronize do
    # Restart if dead (fork, crash)
    @@log_thread = nil if @@log_thread && !@@log_thread.alive?

    @@log_thread ||= Thread.start do
      loop do
        severity, msg = @@log_queue.pop
        @@logger.add(severity, msg)
      end
    end
  end
end

.error_callbackProc?

Returns the current error callback, by default this is nil.

Returns:

  • (Proc, nil)


102
103
104
# File 'lib/rdkafka/config.rb', line 102

def self.error_callback
  @@error_callback
end

.error_callback=(callback) ⇒ nil

Set a callback that will be called every time the underlying client emits an error. If this callback is not set, global errors such as brokers becoming unavailable will only be sent to the logger, as defined by librdkafka. The callback is called with an instance of RdKafka::Error.

Parameters:

  • callback (Proc, #call, nil)

    callable object to handle errors or nil to clear

Returns:

  • (nil)

Raises:

  • (TypeError)


94
95
96
97
# File 'lib/rdkafka/config.rb', line 94

def self.error_callback=(callback)
  raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) || callback.nil?
  @@error_callback = callback
end

.log_queueQueue

Returns a queue whose contents will be passed to the configured logger. Each entry should follow the format [Logger::Severity, String]. The benefit over calling the logger directly is that this is safe to use from trap contexts.

Returns:

  • (Queue)


56
57
58
# File 'lib/rdkafka/config.rb', line 56

def self.log_queue
  @@log_queue
end

.loggerLogger

Returns the current logger, by default this is a logger to stdout.

Returns:

  • (Logger)


29
30
31
# File 'lib/rdkafka/config.rb', line 29

def self.logger
  @@logger
end

.logger=(logger) ⇒ nil

Set the logger that will be used for all logging output by this library.

Parameters:

  • logger (Logger)

    The logger to be used

Returns:

  • (nil)

Raises:



65
66
67
68
# File 'lib/rdkafka/config.rb', line 65

def self.logger=(logger)
  raise NoLoggerError if logger.nil?
  @@logger = logger
end

.oauthbearer_token_refresh_callbackProc?

Returns the current oauthbearer_token_refresh_callback callback, by default this is nil.

Returns:

  • (Proc, nil)


119
120
121
# File 'lib/rdkafka/config.rb', line 119

def self.oauthbearer_token_refresh_callback
  @@oauthbearer_token_refresh_callback
end

.oauthbearer_token_refresh_callback=(callback) ⇒ nil

Sets the SASL/OAUTHBEARER token refresh callback. This callback will be triggered when it is time to refresh the client’s OAUTHBEARER token

Parameters:

  • callback (Proc, #call, nil)

    callable object to handle token refresh or nil to clear

Returns:

  • (nil)

Raises:

  • (TypeError)


111
112
113
114
# File 'lib/rdkafka/config.rb', line 111

def self.oauthbearer_token_refresh_callback=(callback)
  raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) || callback.nil?
  @@oauthbearer_token_refresh_callback = callback
end

.opaquesObject



124
125
126
# File 'lib/rdkafka/config.rb', line 124

def self.opaques
  @@opaques
end

.statistics_callbackProc?

Returns the current statistics callback, by default this is nil.

Returns:

  • (Proc, nil)


84
85
86
# File 'lib/rdkafka/config.rb', line 84

def self.statistics_callback
  @@statistics_callback
end

.statistics_callback=(callback) ⇒ nil

Set a callback that will be called every time the underlying client emits statistics. You can configure if and how often this happens using ‘statistics.interval.ms`. The callback is called with a hash that’s documented here: github.com/confluentinc/librdkafka/blob/master/STATISTICS.md

Parameters:

  • callback (Proc, #call, nil)

    callable object or nil to clear

Returns:

  • (nil)

Raises:

  • (TypeError)


76
77
78
79
# File 'lib/rdkafka/config.rb', line 76

def self.statistics_callback=(callback)
  raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) || callback.nil?
  @@statistics_callback = callback
end

Instance Method Details

#[](key) ⇒ String?

Get a config option with the specified key

Parameters:

  • key (String)

    The config option’s key

Returns:

  • (String, nil)

    The config option or ‘nil` if it is not present



165
166
167
# File 'lib/rdkafka/config.rb', line 165

def [](key)
  @config_hash[key]
end

#[]=(key, value) ⇒ nil

Set a config option.

Parameters:

  • key (String)

    The config option’s key

  • value (String)

    The config option’s value

Returns:

  • (nil)


156
157
158
# File 'lib/rdkafka/config.rb', line 156

def []=(key, value)
  @config_hash[key] = value
end

#admin(native_kafka_auto_start: true, native_kafka_poll_timeout_ms: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS, run_polling_thread: true) ⇒ Admin

Creates an admin instance with this configuration.

Parameters:

  • native_kafka_auto_start (Boolean) (defaults to: true)

    should the native kafka operations be started automatically. Defaults to true. Set to false only when doing complex initialization.

  • native_kafka_poll_timeout_ms (Integer) (defaults to: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS)

    ms poll time of the native Kafka

  • run_polling_thread (Boolean) (defaults to: true)

    should the background polling thread be started. Defaults to true. Set to false when using the FD API for fiber scheduler integration.

Returns:

  • (Admin)

    The created admin instance

Raises:



270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/rdkafka/config.rb', line 270

def admin(native_kafka_auto_start: true, native_kafka_poll_timeout_ms: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS, run_polling_thread: true)
  opaque = Opaque.new
  config = native_config(opaque)
  Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction)

  kafka = native_kafka(config, :rd_kafka_producer)

  Rdkafka::Admin.new(
    Rdkafka::NativeKafka.new(
      kafka,
      run_polling_thread: run_polling_thread,
      opaque: opaque,
      auto_start: native_kafka_auto_start,
      timeout_ms: native_kafka_poll_timeout_ms
    )
  )
end

#consumer(native_kafka_auto_start: true) ⇒ Consumer

Creates a consumer with this configuration.

Parameters:

  • native_kafka_auto_start (Boolean) (defaults to: true)

    should the native kafka operations be started automatically. Defaults to true. Set to false only when doing complex initialization.

Returns:

Raises:



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/rdkafka/config.rb', line 196

def consumer(native_kafka_auto_start: true)
  opaque = Opaque.new
  config = native_config(opaque)

  if @consumer_rebalance_listener
    opaque.consumer_rebalance_listener = @consumer_rebalance_listener
    Rdkafka::Bindings.rd_kafka_conf_set_rebalance_cb(config, Rdkafka::Bindings::RebalanceCallback)
  end

  # Create native client
  kafka = native_kafka(config, :rd_kafka_consumer)

  # Redirect the main queue to the consumer queue
  Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka) if @consumer_poll_set

  # Return consumer with Kafka client
  Rdkafka::Consumer.new(
    Rdkafka::NativeKafka.new(
      kafka,
      run_polling_thread: false,
      opaque: opaque,
      auto_start: native_kafka_auto_start
    )
  )
end

#describe_propertiesHash{Symbol => String}

Note:

The librdkafka C API does not distinguish between producer-only, consumer-only, and global properties at the configuration level. All properties are returned regardless of the intended client type.

Note:

The returned Hash may include sensitive values such as authentication credentials and key passwords. Do not log or serialize the returned data unless you have explicitly redacted secret entries.

Returns all configuration properties and their current values for this config.

Uses ‘rd_kafka_conf_dump` to retrieve every property (including defaults and internal properties like `client.software.name`) as a flat Hash.

Returns:

  • (Hash{Symbol => String})

    property names mapped to their current values

Raises:

  • (ConfigError)

    When the configuration contains invalid options



304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
# File 'lib/rdkafka/config.rb', line 304

def describe_properties
  config = nil
  dump_ptr = nil
  count = 0

  config = native_config
  count_ptr = Rdkafka::Bindings::SizePtr.new
  dump_ptr = Rdkafka::Bindings.rd_kafka_conf_dump(config, count_ptr)

  count = count_ptr[:value]
  result = {}

  (0...count).step(2) do |i|
    key = dump_ptr.get_pointer(i * FFI::Pointer.size).read_string
    value = dump_ptr.get_pointer((i + 1) * FFI::Pointer.size).read_string
    result[key.to_sym] = value
  end

  result
ensure
  Rdkafka::Bindings.rd_kafka_conf_dump_free(dump_ptr, count) if dump_ptr
  Rdkafka::Bindings.rd_kafka_conf_destroy(config) if config
end

#producer(native_kafka_auto_start: true, native_kafka_poll_timeout_ms: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS, run_polling_thread: true) ⇒ Producer

Create a producer with this configuration.

Parameters:

  • native_kafka_auto_start (Boolean) (defaults to: true)

    should the native kafka operations be started automatically. Defaults to true. Set to false only when doing complex initialization.

  • native_kafka_poll_timeout_ms (Integer) (defaults to: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS)

    ms poll time of the native Kafka

  • run_polling_thread (Boolean) (defaults to: true)

    should the background polling thread be started. Defaults to true. Set to false when using the FD API for fiber scheduler integration.

Returns:

Raises:



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/rdkafka/config.rb', line 233

def producer(native_kafka_auto_start: true, native_kafka_poll_timeout_ms: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS, run_polling_thread: true)
  # Create opaque
  opaque = Opaque.new
  # Create Kafka config
  config = native_config(opaque)
  # Set callback to receive delivery reports on config
  Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction)
  # Return producer with Kafka client
  partitioner_name = self[:partitioner] || self["partitioner"]

  kafka = native_kafka(config, :rd_kafka_producer)

  Rdkafka::Producer.new(
    Rdkafka::NativeKafka.new(
      kafka,
      run_polling_thread: run_polling_thread,
      opaque: opaque,
      auto_start: native_kafka_auto_start,
      timeout_ms: native_kafka_poll_timeout_ms
    ),
    partitioner_name
  ).tap do |producer|
    opaque.producer = producer
  end
end