Class: Rdkafka::Config
- Inherits:
-
Object
- Object
- Rdkafka::Config
- Defined in:
- lib/rdkafka/config.rb
Overview
Configuration for a Kafka consumer or producer. You can create an instance and use the consumer and producer methods to create a client. Documentation of the available configuration options is available on github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md.
Defined Under Namespace
Classes: ClientCreationError, ConfigError, NoLoggerError
Constant Summary collapse
- DEFAULT_CONFIG =
Default config that can be overwritten.
{}.freeze
- REQUIRED_CONFIG =
Required config that cannot be overwritten.
{ # Enable log queues so we get callbacks in our own Ruby threads "log.queue": true }.freeze
- @@logger =
Logger.new($stdout)
- @@statistics_callback =
nil- @@error_callback =
nil- @@opaques =
ObjectSpace::WeakMap.new
- @@log_queue =
Queue.new
- @@log_thread =
We memoize thread on the first log flush This allows us also to restart logger thread on forks
nil- @@log_mutex =
Mutex.new
- @@oauthbearer_token_refresh_callback =
nil
Instance Attribute Summary collapse
-
#consumer_poll_set ⇒ Boolean
writeonly
Should we use a single queue for the underlying consumer and events.
-
#consumer_rebalance_listener ⇒ Object, ...
writeonly
Get notifications on partition assignment/revocation for the subscribed topics.
Class Method Summary collapse
-
.ensure_log_thread ⇒ Object
Makes sure that there is a thread for consuming logs We do not spawn thread immediately and we need to check if it operates to support forking.
-
.error_callback ⇒ Proc?
Returns the current error callback, by default this is nil.
-
.error_callback=(callback) ⇒ nil
Set a callback that will be called every time the underlying client emits an error.
-
.log_queue ⇒ Queue
Returns a queue whose contents will be passed to the configured logger.
-
.logger ⇒ Logger
Returns the current logger, by default this is a logger to stdout.
-
.logger=(logger) ⇒ nil
Set the logger that will be used for all logging output by this library.
-
.oauthbearer_token_refresh_callback ⇒ Proc?
Returns the current oauthbearer_token_refresh_callback callback, by default this is nil.
-
.oauthbearer_token_refresh_callback=(callback) ⇒ nil
Sets the SASL/OAUTHBEARER token refresh callback.
- .opaques ⇒ Object
-
.statistics_callback ⇒ Proc?
Returns the current statistics callback, by default this is nil.
-
.statistics_callback=(callback) ⇒ nil
Set a callback that will be called every time the underlying client emits statistics.
Instance Method Summary collapse
-
#[](key) ⇒ String?
Get a config option with the specified key.
-
#[]=(key, value) ⇒ nil
Set a config option.
-
#admin(native_kafka_auto_start: true, native_kafka_poll_timeout_ms: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS, run_polling_thread: true) ⇒ Admin
Creates an admin instance with this configuration.
-
#consumer(native_kafka_auto_start: true) ⇒ Consumer
Creates a consumer with this configuration.
-
#describe_properties ⇒ Hash{Symbol => String}
Returns all configuration properties and their current values for this config.
-
#initialize(config_hash = {}) ⇒ Config
constructor
Returns a new config with the provided options which are merged with DEFAULT_CONFIG.
-
#producer(native_kafka_auto_start: true, native_kafka_poll_timeout_ms: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS, run_polling_thread: true) ⇒ Producer
Create a producer with this configuration.
Constructor Details
#initialize(config_hash = {}) ⇒ Config
Returns a new config with the provided options which are merged with DEFAULT_CONFIG.
142 143 144 145 146 147 148 |
# File 'lib/rdkafka/config.rb', line 142 def initialize(config_hash = {}) Callbacks.ensure_ffi_running @config_hash = DEFAULT_CONFIG.merge(config_hash) @consumer_rebalance_listener = nil @consumer_poll_set = true end |
Instance Attribute Details
#consumer_poll_set=(value) ⇒ Boolean (writeonly)
Should we use a single queue for the underlying consumer and events.
This is an advanced API that allows for more granular control of the polling process. When this value is set to ‘false` (`true` by defualt), there will be two queues that need to be polled:
- main librdkafka queue for events
- consumer queue with messages and rebalances
It is recommended to use the defaults and only set it to ‘false` in advance multi-threaded and complex cases where granular events handling control is needed.
186 187 188 |
# File 'lib/rdkafka/config.rb', line 186 def consumer_poll_set=(value) @consumer_poll_set = value end |
#consumer_rebalance_listener=(value) ⇒ Object, ... (writeonly)
Get notifications on partition assignment/revocation for the subscribed topics
172 173 174 |
# File 'lib/rdkafka/config.rb', line 172 def consumer_rebalance_listener=(value) @consumer_rebalance_listener = value end |
Class Method Details
.ensure_log_thread ⇒ Object
Makes sure that there is a thread for consuming logs We do not spawn thread immediately and we need to check if it operates to support forking
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/rdkafka/config.rb', line 35 def self.ensure_log_thread return if @@log_thread&.alive? @@log_mutex.synchronize do # Restart if dead (fork, crash) @@log_thread = nil if @@log_thread && !@@log_thread.alive? @@log_thread ||= Thread.start do loop do severity, msg = @@log_queue.pop @@logger.add(severity, msg) end end end end |
.error_callback ⇒ Proc?
Returns the current error callback, by default this is nil.
102 103 104 |
# File 'lib/rdkafka/config.rb', line 102 def self.error_callback @@error_callback end |
.error_callback=(callback) ⇒ nil
Set a callback that will be called every time the underlying client emits an error. If this callback is not set, global errors such as brokers becoming unavailable will only be sent to the logger, as defined by librdkafka. The callback is called with an instance of RdKafka::Error.
94 95 96 97 |
# File 'lib/rdkafka/config.rb', line 94 def self.error_callback=(callback) raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) || callback.nil? @@error_callback = callback end |
.log_queue ⇒ Queue
Returns a queue whose contents will be passed to the configured logger. Each entry should follow the format [Logger::Severity, String]. The benefit over calling the logger directly is that this is safe to use from trap contexts.
56 57 58 |
# File 'lib/rdkafka/config.rb', line 56 def self.log_queue @@log_queue end |
.logger ⇒ Logger
Returns the current logger, by default this is a logger to stdout.
29 30 31 |
# File 'lib/rdkafka/config.rb', line 29 def self.logger @@logger end |
.logger=(logger) ⇒ nil
Set the logger that will be used for all logging output by this library.
65 66 67 68 |
# File 'lib/rdkafka/config.rb', line 65 def self.logger=(logger) raise NoLoggerError if logger.nil? @@logger = logger end |
.oauthbearer_token_refresh_callback ⇒ Proc?
Returns the current oauthbearer_token_refresh_callback callback, by default this is nil.
119 120 121 |
# File 'lib/rdkafka/config.rb', line 119 def self.oauthbearer_token_refresh_callback @@oauthbearer_token_refresh_callback end |
.oauthbearer_token_refresh_callback=(callback) ⇒ nil
Sets the SASL/OAUTHBEARER token refresh callback. This callback will be triggered when it is time to refresh the client’s OAUTHBEARER token
111 112 113 114 |
# File 'lib/rdkafka/config.rb', line 111 def self.oauthbearer_token_refresh_callback=(callback) raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) || callback.nil? @@oauthbearer_token_refresh_callback = callback end |
.opaques ⇒ Object
124 125 126 |
# File 'lib/rdkafka/config.rb', line 124 def self.opaques @@opaques end |
.statistics_callback ⇒ Proc?
Returns the current statistics callback, by default this is nil.
84 85 86 |
# File 'lib/rdkafka/config.rb', line 84 def self.statistics_callback @@statistics_callback end |
.statistics_callback=(callback) ⇒ nil
Set a callback that will be called every time the underlying client emits statistics. You can configure if and how often this happens using ‘statistics.interval.ms`. The callback is called with a hash that’s documented here: github.com/confluentinc/librdkafka/blob/master/STATISTICS.md
76 77 78 79 |
# File 'lib/rdkafka/config.rb', line 76 def self.statistics_callback=(callback) raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) || callback.nil? @@statistics_callback = callback end |
Instance Method Details
#[](key) ⇒ String?
Get a config option with the specified key
165 166 167 |
# File 'lib/rdkafka/config.rb', line 165 def [](key) @config_hash[key] end |
#[]=(key, value) ⇒ nil
Set a config option.
156 157 158 |
# File 'lib/rdkafka/config.rb', line 156 def []=(key, value) @config_hash[key] = value end |
#admin(native_kafka_auto_start: true, native_kafka_poll_timeout_ms: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS, run_polling_thread: true) ⇒ Admin
Creates an admin instance with this configuration.
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/rdkafka/config.rb', line 270 def admin(native_kafka_auto_start: true, native_kafka_poll_timeout_ms: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS, run_polling_thread: true) opaque = Opaque.new config = native_config(opaque) Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction) kafka = native_kafka(config, :rd_kafka_producer) Rdkafka::Admin.new( Rdkafka::NativeKafka.new( kafka, run_polling_thread: run_polling_thread, opaque: opaque, auto_start: native_kafka_auto_start, timeout_ms: native_kafka_poll_timeout_ms ) ) end |
#consumer(native_kafka_auto_start: true) ⇒ Consumer
Creates a consumer with this configuration.
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/rdkafka/config.rb', line 196 def consumer(native_kafka_auto_start: true) opaque = Opaque.new config = native_config(opaque) if @consumer_rebalance_listener opaque.consumer_rebalance_listener = @consumer_rebalance_listener Rdkafka::Bindings.rd_kafka_conf_set_rebalance_cb(config, Rdkafka::Bindings::RebalanceCallback) end # Create native client kafka = native_kafka(config, :rd_kafka_consumer) # Redirect the main queue to the consumer queue Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka) if @consumer_poll_set # Return consumer with Kafka client Rdkafka::Consumer.new( Rdkafka::NativeKafka.new( kafka, run_polling_thread: false, opaque: opaque, auto_start: native_kafka_auto_start ) ) end |
#describe_properties ⇒ Hash{Symbol => String}
The librdkafka C API does not distinguish between producer-only, consumer-only, and global properties at the configuration level. All properties are returned regardless of the intended client type.
The returned Hash may include sensitive values such as authentication credentials and key passwords. Do not log or serialize the returned data unless you have explicitly redacted secret entries.
Returns all configuration properties and their current values for this config.
Uses ‘rd_kafka_conf_dump` to retrieve every property (including defaults and internal properties like `client.software.name`) as a flat Hash.
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 |
# File 'lib/rdkafka/config.rb', line 304 def describe_properties config = nil dump_ptr = nil count = 0 config = native_config count_ptr = Rdkafka::Bindings::SizePtr.new dump_ptr = Rdkafka::Bindings.rd_kafka_conf_dump(config, count_ptr) count = count_ptr[:value] result = {} (0...count).step(2) do |i| key = dump_ptr.get_pointer(i * FFI::Pointer.size).read_string value = dump_ptr.get_pointer((i + 1) * FFI::Pointer.size).read_string result[key.to_sym] = value end result ensure Rdkafka::Bindings.rd_kafka_conf_dump_free(dump_ptr, count) if dump_ptr Rdkafka::Bindings.rd_kafka_conf_destroy(config) if config end |
#producer(native_kafka_auto_start: true, native_kafka_poll_timeout_ms: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS, run_polling_thread: true) ⇒ Producer
Create a producer with this configuration.
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/rdkafka/config.rb', line 233 def producer(native_kafka_auto_start: true, native_kafka_poll_timeout_ms: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS, run_polling_thread: true) # Create opaque opaque = Opaque.new # Create Kafka config config = native_config(opaque) # Set callback to receive delivery reports on config Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction) # Return producer with Kafka client partitioner_name = self[:partitioner] || self["partitioner"] kafka = native_kafka(config, :rd_kafka_producer) Rdkafka::Producer.new( Rdkafka::NativeKafka.new( kafka, run_polling_thread: run_polling_thread, opaque: opaque, auto_start: native_kafka_auto_start, timeout_ms: native_kafka_poll_timeout_ms ), partitioner_name ).tap do |producer| opaque.producer = producer end end |