Class: Rdkafka::Config

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/config.rb

Overview

Configuration for a Kafka consumer or producer. You can create an instance and use the consumer and producer methods to create a client. Documentation of the available configuration options is available on https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md.

Defined Under Namespace

Classes: ClientCreationError, ConfigError, NoLoggerError

Constant Summary collapse

DEFAULT_CONFIG =

Default config that can be overwritten.

{
  # Request api version so advanced features work
  :"api.version.request" => true
}.freeze
REQUIRED_CONFIG =

Required config that cannot be overwritten.

{
  # Enable log queues so we get callbacks in our own Ruby threads
  :"log.queue" => true
}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config_hash = {}) ⇒ Config

Returns a new config with the provided options which are merged with DEFAULT_CONFIG.

Parameters:

  • config_hash (Hash{String,Symbol => String}) (defaults to: {})

    The config options for rdkafka



112
113
114
115
# File 'lib/rdkafka/config.rb', line 112

def initialize(config_hash = {})
  @config_hash = DEFAULT_CONFIG.merge(config_hash)
  @consumer_rebalance_listener = nil
end

Class Method Details

.error_callbackProc?

Returns the current error callback, by default this is nil.

Returns:

  • (Proc, nil)


86
87
88
# File 'lib/rdkafka/config.rb', line 86

def self.error_callback
  @@error_callback
end

.error_callback=(callback) ⇒ nil

Set a callback that will be called every time the underlying client emits an error. If this callback is not set, global errors such as brokers becoming unavailable will only be sent to the logger, as defined by librdkafka. The callback is called with an instance of RdKafka::Error.

Parameters:

  • callback (Proc, #call)

    The callback

Returns:

  • (nil)

Raises:

  • (TypeError)


78
79
80
81
# File 'lib/rdkafka/config.rb', line 78

def self.error_callback=(callback)
  raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call)
  @@error_callback = callback
end

.log_queueQueue

Returns a queue whose contents will be passed to the configured logger. Each entry should follow the format [Logger::Severity, String]. The benefit over calling the logger directly is that this is safe to use from trap contexts.

Returns:

  • (Queue)


38
39
40
# File 'lib/rdkafka/config.rb', line 38

def self.log_queue
  @@log_queue
end

.loggerLogger

Returns the current logger, by default this is a logger to stdout.

Returns:

  • (Logger)


29
30
31
# File 'lib/rdkafka/config.rb', line 29

def self.logger
  @@logger
end

.logger=(logger) ⇒ nil

Set the logger that will be used for all logging output by this library.

Parameters:

  • logger (Logger)

    The logger to be used

Returns:

  • (nil)

Raises:



47
48
49
50
# File 'lib/rdkafka/config.rb', line 47

def self.logger=(logger)
  raise NoLoggerError if logger.nil?
  @@logger = logger
end

.statistics_callbackProc?

Returns the current statistics callback, by default this is nil.

Returns:

  • (Proc, nil)


67
68
69
# File 'lib/rdkafka/config.rb', line 67

def self.statistics_callback
  @@statistics_callback
end

.statistics_callback=(callback) ⇒ nil

Set a callback that will be called every time the underlying client emits statistics. You can configure if and how often this happens using statistics.interval.ms. The callback is called with a hash that's documented here: https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md

Parameters:

  • callback (Proc, #call)

    The callback

Returns:

  • (nil)

Raises:

  • (TypeError)


59
60
61
62
# File 'lib/rdkafka/config.rb', line 59

def self.statistics_callback=(callback)
  raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) || callback == nil
  @@statistics_callback = callback
end

Instance Method Details

#[](key) ⇒ String?

Get a config option with the specified key

Parameters:

  • key (String)

    The config option's key

Returns:

  • (String, nil)

    The config option or nil if it is not present



132
133
134
# File 'lib/rdkafka/config.rb', line 132

def [](key)
  @config_hash[key]
end

#[]=(key, value) ⇒ nil

Set a config option.

Parameters:

  • key (String)

    The config option's key

  • value (String)

    The config option's value

Returns:

  • (nil)


123
124
125
# File 'lib/rdkafka/config.rb', line 123

def []=(key, value)
  @config_hash[key] = value
end

#adminAdmin

Creates an admin instance with this configuration.

Returns:

  • (Admin)

    The created admin instance

Raises:



203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/rdkafka/config.rb', line 203

def admin
  opaque = Opaque.new
  config = native_config(opaque)
  Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction)
  Rdkafka::Admin.new(
    Rdkafka::NativeKafka.new(
      native_kafka(config, :rd_kafka_producer),
      run_polling_thread: true,
      opaque: opaque
    )
  )
end

#consumerConsumer

Creates a consumer with this configuration.

Returns:

Raises:



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/rdkafka/config.rb', line 149

def consumer
  opaque = Opaque.new
  config = native_config(opaque)

  if @consumer_rebalance_listener
    opaque.consumer_rebalance_listener = @consumer_rebalance_listener
    Rdkafka::Bindings.rd_kafka_conf_set_rebalance_cb(config, Rdkafka::Bindings::RebalanceCallback)
  end

  # Create native client
  kafka = native_kafka(config, :rd_kafka_consumer)

  # Redirect the main queue to the consumer
  Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka)

  # Return consumer with Kafka client
  Rdkafka::Consumer.new(
    Rdkafka::NativeKafka.new(
      kafka,
      run_polling_thread: false,
      opaque: opaque
    )
  )
end

#consumer_rebalance_listener=(listener) ⇒ Object

Get notifications on partition assignment/revocation for the subscribed topics

Parameters:

  • listener (Object, #on_partitions_assigned, #on_partitions_revoked)

    listener instance



139
140
141
# File 'lib/rdkafka/config.rb', line 139

def consumer_rebalance_listener=(listener)
  @consumer_rebalance_listener = listener
end

#producerProducer

Create a producer with this configuration.

Returns:

Raises:



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/rdkafka/config.rb', line 180

def producer
  # Create opaque
  opaque = Opaque.new
  # Create Kafka config
  config = native_config(opaque)
  # Set callback to receive delivery reports on config
  Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction)
  # Return producer with Kafka client
  partitioner_name = self[:partitioner] || self["partitioner"]
  Rdkafka::Producer.new(
    Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true, opaque: opaque),
    partitioner_name
  ).tap do |producer|
    opaque.producer = producer
  end
end