Module: Rdkafka::Bindings

Extended by:
FFI::Library
Defined in:
lib/rdkafka/bindings.rb

Overview

Note:

There are two types of responses related to errors:

- rd_kafka_error_t - a C object that we need to remap into an error or null when no error
- rd_kafka_resp_err_t - response error code (numeric) that we can use directly

It is critical to ensure, that we handle them correctly. The result type should be:

- rd_kafka_error_t - :pointer
- rd_kafka_resp_err_t - :int

Defined Under Namespace

Classes: ConfigResource, Message, NativeError, NativeErrorDesc, SizePtr, TopicPartition, TopicPartitionList

Constant Summary collapse

RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS =
-175
RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS =
-174
RD_KAFKA_RESP_ERR__STATE =
-172
RD_KAFKA_RESP_ERR__NOENT =
-156
RD_KAFKA_RESP_ERR__FATAL =
-150
RD_KAFKA_RESP_ERR_NO_ERROR =
0
FATAL_ERROR_BUFFER_SIZE =

Buffer size for fatal error strings, matches librdkafka expectations

256
RD_KAFKA_PARTITION_UA =

Unassigned partition

-1
# String representation of unassigned partition (used in stats hash keys)
RD_KAFKA_PARTITION_UA_STR =

String representation of unassigned partition (used in stats hash keys)

RD_KAFKA_PARTITION_UA.to_s.freeze
RD_KAFKA_OFFSET_END =
-1
RD_KAFKA_OFFSET_BEGINNING =
-2
RD_KAFKA_OFFSET_STORED =
-1000
RD_KAFKA_OFFSET_INVALID =
-1001
EMPTY_HASH =
{}.freeze
RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS =
5
RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT =
104
RD_KAFKA_ADMIN_OP_INCREMENTALALTERCONFIGS =
16
RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT =
131072
RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET =
0
RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE =
1
RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND =
2
RD_KAFKA_ALTER_CONFIG_OP_TYPE_SUBTRACT =
3
LogCallback =
FFI::Function.new(
  :void, [:pointer, :int, :string, :string]
) do |_client_ptr, level, _level_string, line|
  severity = case level
  when 0, 1, 2
    Logger::FATAL
  when 3
    Logger::ERROR
  when 4
    Logger::WARN
  when 5, 6
    Logger::INFO
  when 7
    Logger::DEBUG
  else
    Logger::UNKNOWN
  end

  Rdkafka::Config.ensure_log_thread
  Rdkafka::Config.log_queue << [severity, "rdkafka: #{line}"]
end
StatsCallback =
FFI::Function.new(
  :int, [:pointer, :string, :int, :pointer]
) do |_client_ptr, json, _json_len, _opaque|
  if Rdkafka::Config.statistics_callback
    stats = JSON.parse(json)

    # If user requested statistics callbacks, we can use the statistics data to get the
    # partitions count for each topic when this data is published. That way we do not have
    # to query this information when user is using `partition_key`. This takes around 0.02ms
    # every statistics interval period (most likely every 5 seconds) and saves us from making
    # any queries to the cluster for the partition count.
    #
    # One edge case is if user would set the `statistics.interval.ms` much higher than the
    # default current partition count refresh (30 seconds). This is taken care of as the lack
    # of reporting to the partitions cache will cause cache expire and blocking refresh.
    #
    # If user sets `topic.metadata.refresh.interval.ms` too high this is on the user.
    #
    # Since this cache is shared, having few consumers and/or producers in one process will
    # automatically improve the querying times even with low refresh times.
    (stats["topics"] || EMPTY_HASH).each do |topic_name, details|
      partitions_count = details["partitions"].keys.count { |k| !(k == RD_KAFKA_PARTITION_UA_STR) }

      next unless partitions_count.positive?

      Rdkafka::Producer.partitions_count_cache.set(topic_name, partitions_count)
    end

    Rdkafka::Config.statistics_callback.call(stats)
  end

  # Return 0 so librdkafka frees the json string
  RD_KAFKA_RESP_ERR_NO_ERROR
end
ErrorCallback =
FFI::Function.new(
  :void, [:pointer, :int, :string, :pointer]
) do |client_ptr, err_code, reason, _opaque|
  if Rdkafka::Config.error_callback
    # Handle fatal errors according to librdkafka documentation:
    # When ERR__FATAL is received, we must call rd_kafka_fatal_error()
    # to get the actual underlying fatal error code and description.
    error = if err_code == RD_KAFKA_RESP_ERR__FATAL
      Rdkafka::RdkafkaError.build_fatal(
        client_ptr,
        fallback_error_code: err_code,
        fallback_message: reason
      )
    else
      Rdkafka::RdkafkaError.build(err_code, broker_message: reason)
    end

    error.set_backtrace(caller)
    Rdkafka::Config.error_callback.call(error)
  end
end
OAuthbearerTokenRefreshCallback =

The OAuth callback is currently global and contextless. This means that the callback will be called for all instances, and the callback must be able to determine to which instance it is associated. The instance name will be provided in the callback, allowing the callback to reference the correct instance.

An example of how to use the instance name in the callback is given below. The ‘refresh_token` is configured as the `oauthbearer_token_refresh_callback`. `instances` is a map of client names to client instances, maintained by the user.

“‘

def refresh_token(config, client_name)
  client = instances[client_name]
  client.oauthbearer_set_token(
    token: 'new-token-value',
    lifetime_ms: token-lifetime-ms,
    principal_name: 'principal-name'
  )
end

“‘

FFI::Function.new(
  :void, [:pointer, :string, :pointer]
) do |client_ptr, config, _opaque|
  if Rdkafka::Config.oauthbearer_token_refresh_callback && !client_ptr.null?
    Rdkafka::Config.oauthbearer_token_refresh_callback.call(config, Rdkafka::Bindings.rd_kafka_name(client_ptr))
  end
end
RebalanceCallback =
FFI::Function.new(
  :void, [:pointer, :int, :pointer, :pointer]
) do |client_ptr, code, partitions_ptr, opaque_ptr|
  case code
  when RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
    if Rdkafka::Bindings.rd_kafka_rebalance_protocol(client_ptr) == "COOPERATIVE"
      Rdkafka::Bindings.rd_kafka_incremental_assign(client_ptr, partitions_ptr)
    else
      Rdkafka::Bindings.rd_kafka_assign(client_ptr, partitions_ptr)
    end
  else # RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS or errors
    if Rdkafka::Bindings.rd_kafka_rebalance_protocol(client_ptr) == "COOPERATIVE"
      Rdkafka::Bindings.rd_kafka_incremental_unassign(client_ptr, partitions_ptr)
    else
      Rdkafka::Bindings.rd_kafka_assign(client_ptr, FFI::Pointer::NULL)
    end
  end

  opaque = Rdkafka::Config.opaques[opaque_ptr.to_i]
  return unless opaque

  tpl = Rdkafka::Consumer::TopicPartitionList.from_native_tpl(partitions_ptr).freeze
  begin
    case code
    when RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
      opaque.call_on_partitions_assigned(tpl)
    when RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
      opaque.call_on_partitions_revoked(tpl)
    end
  rescue Exception => err
    Rdkafka::Config.logger.error("Unhandled exception: #{err.class} - #{err.message}")
  end
end
RD_KAFKA_VTYPE_END =

Producer

0
RD_KAFKA_VTYPE_TOPIC =
1
RD_KAFKA_VTYPE_RKT =
2
RD_KAFKA_VTYPE_PARTITION =
3
RD_KAFKA_VTYPE_VALUE =
4
RD_KAFKA_VTYPE_KEY =
5
RD_KAFKA_VTYPE_OPAQUE =
6
RD_KAFKA_VTYPE_MSGFLAGS =
7
RD_KAFKA_VTYPE_TIMESTAMP =
8
RD_KAFKA_VTYPE_HEADER =
9
RD_KAFKA_VTYPE_HEADERS =
10
RD_KAFKA_PURGE_F_QUEUE =
1
RD_KAFKA_PURGE_F_INFLIGHT =
2
RD_KAFKA_MSG_F_COPY =
0x2
PARTITIONERS =

Hash mapping partitioner names to their FFI function symbols

Returns:

  • (Hash{String => Symbol})
%w[random consistent consistent_random murmur2 murmur2_random fnv1a fnv1a_random].each_with_object({}) do |name, hsh|
  method_name = :"rd_kafka_msg_partitioner_#{name}"
  attach_function method_name, [:pointer, :pointer, :size_t, :int32, :pointer, :pointer], :int32
  hsh[name] = method_name
end
RD_KAFKA_ADMIN_OP_CREATETOPICS =

Create Topics

1
RD_KAFKA_EVENT_CREATETOPICS_RESULT =

rd_kafka_admin_op_t

100
RD_KAFKA_ADMIN_OP_DELETETOPICS =

Delete Topics

2
RD_KAFKA_EVENT_DELETETOPICS_RESULT =

rd_kafka_admin_op_t

101
RD_KAFKA_ADMIN_OP_CREATEPARTITIONS =

Create partitions

3
RD_KAFKA_ADMIN_OP_CREATEPARTITIONS_RESULT =
102
RD_KAFKA_ADMIN_OP_DELETEGROUPS =

Delete Group

7
RD_KAFKA_EVENT_DELETEGROUPS_RESULT =

rd_kafka_admin_op_t

106
RD_KAFKA_ADMIN_OP_CREATEACLS =

Create Acls

9
RD_KAFKA_EVENT_CREATEACLS_RESULT =
1024
RD_KAFKA_ADMIN_OP_DELETEACLS =

Delete Acls

11
RD_KAFKA_EVENT_DELETEACLS_RESULT =
4096
RD_KAFKA_ADMIN_OP_DESCRIBEACLS =

Describe Acls

10
RD_KAFKA_EVENT_DESCRIBEACLS_RESULT =
2048
RD_KAFKA_RESOURCE_ANY =
1
RD_KAFKA_RESOURCE_TOPIC =
2
RD_KAFKA_RESOURCE_GROUP =
3
RD_KAFKA_RESOURCE_BROKER =
4
RD_KAFKA_RESOURCE_TRANSACTIONAL_ID =
5
RD_KAFKA_RESOURCE_PATTERN_UNKNOWN =
0
RD_KAFKA_RESOURCE_PATTERN_ANY =
1
RD_KAFKA_RESOURCE_PATTERN_MATCH =
2
RD_KAFKA_RESOURCE_PATTERN_LITERAL =
3
RD_KAFKA_RESOURCE_PATTERN_PREFIXED =
4
RD_KAFKA_ACL_OPERATION_ANY =
1
RD_KAFKA_ACL_OPERATION_ALL =
2
RD_KAFKA_ACL_OPERATION_READ =
3
RD_KAFKA_ACL_OPERATION_WRITE =
4
RD_KAFKA_ACL_OPERATION_CREATE =
5
RD_KAFKA_ACL_OPERATION_DELETE =
6
RD_KAFKA_ACL_OPERATION_ALTER =
7
RD_KAFKA_ACL_OPERATION_DESCRIBE =
8
RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION =
9
RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS =
10
RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS =
11
RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE =
12
RD_KAFKA_ACL_PERMISSION_TYPE_ANY =
1
RD_KAFKA_ACL_PERMISSION_TYPE_DENY =
2
RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW =
3

Class Method Summary collapse

Class Method Details

.extract_fatal_error(client_ptr) ⇒ Hash?

Retrieves fatal error details from a kafka client handle. This is a helper method to extract fatal error information consistently across different parts of the codebase (callbacks, testing utilities, etc.).

Examples:

details = Rdkafka::Bindings.extract_fatal_error(client_ptr)
if details
  puts "Fatal error #{details[:error_code]}: #{details[:error_string]}"
end

Parameters:

  • client_ptr (FFI::Pointer)

    Native kafka client pointer

Returns:

  • (Hash, nil)

    Hash with :error_code and :error_string if fatal error occurred, nil otherwise



315
316
317
318
319
320
321
322
323
324
325
326
# File 'lib/rdkafka/bindings.rb', line 315

def self.extract_fatal_error(client_ptr)
  error_buffer = FFI::MemoryPointer.new(:char, FATAL_ERROR_BUFFER_SIZE)

  error_code = rd_kafka_fatal_error(client_ptr, error_buffer, FATAL_ERROR_BUFFER_SIZE)

  return nil if error_code == RD_KAFKA_RESP_ERR_NO_ERROR

  {
    error_code: error_code,
    error_string: error_buffer.read_string
  }
end

.lib_extensionString

Returns the library extension based on the host OS

Returns:

  • (String)

    ‘dylib’ on macOS, ‘so’ on other systems



19
20
21
22
23
24
25
# File 'lib/rdkafka/bindings.rb', line 19

def self.lib_extension
  if /darwin/.match?(RbConfig::CONFIG["host_os"])
    "dylib"
  else
    "so"
  end
end

.partitioner(topic_ptr, str, partition_count, partitioner = "consistent_random") ⇒ Integer

Calculates the partition for a message based on the partitioner

Parameters:

  • topic_ptr (FFI::Pointer)

    pointer to the topic handle

  • str (String)

    the partition key string

  • partition_count (Integer, nil)

    number of partitions

  • partitioner (String) (defaults to: "consistent_random")

    name of the partitioner to use

Returns:

  • (Integer)

    partition number or RD_KAFKA_PARTITION_UA if unassigned

Raises:



502
503
504
505
506
507
508
509
510
511
512
# File 'lib/rdkafka/bindings.rb', line 502

def self.partitioner(topic_ptr, str, partition_count, partitioner = "consistent_random")
  # Return RD_KAFKA_PARTITION_UA(unassigned partition) when partition count is nil/zero.
  return RD_KAFKA_PARTITION_UA unless partition_count&.nonzero?

  str_ptr = str.empty? ? FFI::MemoryPointer::NULL : FFI::MemoryPointer.from_string(str)
  method_name = PARTITIONERS.fetch(partitioner) do
    raise Rdkafka::Config::ConfigError.new("Unknown partitioner: #{partitioner}")
  end

  public_send(method_name, topic_ptr, str_ptr, str.size, partition_count, nil, nil)
end