Module: Rdkafka::Bindings

Extended by:
FFI::Library
Defined in:
lib/rdkafka/bindings.rb

Overview

Note:

There are two types of responses related to errors:

- rd_kafka_error_t - a C object that we need to remap into an error or null when no error
- rd_kafka_resp_err_t - response error code (numeric) that we can use directly

It is critical to ensure, that we handle them correctly. The result type should be:

- rd_kafka_error_t - :pointer
- rd_kafka_resp_err_t - :int

Defined Under Namespace

Classes: ConfigResource, Message, NativeError, NativeErrorDesc, SizePtr, TopicPartition, TopicPartitionList

Constant Summary collapse

RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS =
-175
RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS =
-174
RD_KAFKA_RESP_ERR__STATE =
-172
RD_KAFKA_RESP_ERR__NOENT =
-156
RD_KAFKA_RESP_ERR_NO_ERROR =
0
RD_KAFKA_OFFSET_END =
-1
RD_KAFKA_OFFSET_BEGINNING =
-2
RD_KAFKA_OFFSET_STORED =
-1000
RD_KAFKA_OFFSET_INVALID =
-1001
RD_KAFKA_PARTITION_UA =
-1
RD_KAFKA_PARTITION_UA_STR =
RD_KAFKA_PARTITION_UA.to_s.freeze
EMPTY_HASH =
{}.freeze
RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS =
5
RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT =
104
RD_KAFKA_ADMIN_OP_INCREMENTALALTERCONFIGS =
16
RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT =
131072
RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET =
0
RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE =
1
RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND =
2
RD_KAFKA_ALTER_CONFIG_OP_TYPE_SUBTRACT =
3
RD_KAFKA_ADMIN_OP_LISTOFFSETS =

List Offsets

20
RD_KAFKA_EVENT_LISTOFFSETS_RESULT =
0x400000
RD_KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED =

rd_kafka_IsolationLevel_t

0
RD_KAFKA_ISOLATION_LEVEL_READ_COMMITTED =
1
RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP =

rd_kafka_OffsetSpec_t

-3
RD_KAFKA_OFFSET_SPEC_EARLIEST =
-2
RD_KAFKA_OFFSET_SPEC_LATEST =
-1
LogCallback =
FFI::Function.new(
  :void, [:pointer, :int, :string, :string]
) do |_client_ptr, level, _level_string, line|
  severity = case level
  when 0, 1, 2
    Logger::FATAL
  when 3
    Logger::ERROR
  when 4
    Logger::WARN
  when 5, 6
    Logger::INFO
  when 7
    Logger::DEBUG
  else
    Logger::UNKNOWN
  end

  Rdkafka::Config.ensure_log_thread
  Rdkafka::Config.log_queue << [severity, "rdkafka: #{line}"]
end
StatsCallback =
FFI::Function.new(
  :int, [:pointer, :string, :int, :pointer]
) do |_client_ptr, json, _json_len, _opaque|
  if Rdkafka::Config.statistics_callback
    stats = JSON.parse(json)

    # If user requested statistics callbacks, we can use the statistics data to get the
    # partitions count for each topic when this data is published. That way we do not have
    # to query this information when user is using `partition_key`. This takes around 0.02ms
    # every statistics interval period (most likely every 5 seconds) and saves us from making
    # any queries to the cluster for the partition count.
    #
    # One edge case is if user would set the `statistics.interval.ms` much higher than the
    # default current partition count refresh (30 seconds). This is taken care of as the lack
    # of reporting to the partitions cache will cause cache expire and blocking refresh.
    #
    # If user sets `topic.metadata.refresh.interval.ms` too high this is on the user.
    #
    # Since this cache is shared, having few consumers and/or producers in one process will
    # automatically improve the querying times even with low refresh times.
    (stats["topics"] || EMPTY_HASH).each do |topic_name, details|
      partitions_count = details["partitions"].keys.count { |k| !(k == RD_KAFKA_PARTITION_UA_STR) }

      next unless partitions_count.positive?

      Rdkafka::Producer.partitions_count_cache.set(topic_name, partitions_count)
    end

    Rdkafka::Config.statistics_callback.call(stats)
  end

  # Return 0 so librdkafka frees the json string
  RD_KAFKA_RESP_ERR_NO_ERROR
end
ErrorCallback =
FFI::Function.new(
  :void, [:pointer, :int, :string, :pointer]
) do |client_ptr, err_code, reason, _opaque|
  if Rdkafka::Config.error_callback
    instance_name = client_ptr.null? ? nil : Rdkafka::Bindings.rd_kafka_name(client_ptr)
    error = Rdkafka::RdkafkaError.new(err_code, broker_message: reason, instance_name: instance_name)
    error.set_backtrace(caller)
    Rdkafka::Config.error_callback.call(error)
  end
end
OAuthbearerTokenRefreshCallback =

The OAuth callback is currently global and contextless. This means that the callback will be called for all instances, and the callback must be able to determine to which instance it is associated. The instance name will be provided in the callback, allowing the callback to reference the correct instance.

An example of how to use the instance name in the callback is given below. The ‘refresh_token` is configured as the `oauthbearer_token_refresh_callback`. `instances` is a map of client names to client instances, maintained by the user.

“‘

def refresh_token(config, client_name)
  client = instances[client_name]
  client.oauthbearer_set_token(
    token: 'new-token-value',
    lifetime_ms: token-lifetime-ms,
    principal_name: 'principal-name'
  )
end

“‘

FFI::Function.new(
  :void, [:pointer, :string, :pointer]
) do |client_ptr, config, _opaque|
  Rdkafka::Config.oauthbearer_token_refresh_callback&.call(config, Rdkafka::Bindings.rd_kafka_name(client_ptr))
end
RebalanceCallback =
FFI::Function.new(
  :void, [:pointer, :int, :pointer, :pointer]
) do |client_ptr, code, partitions_ptr, opaque_ptr|
  case code
  when RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
    if Rdkafka::Bindings.rd_kafka_rebalance_protocol(client_ptr) == "COOPERATIVE"
      Rdkafka::Bindings.rd_kafka_incremental_assign(client_ptr, partitions_ptr)
    else
      Rdkafka::Bindings.rd_kafka_assign(client_ptr, partitions_ptr)
    end
  else # RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS or errors
    if Rdkafka::Bindings.rd_kafka_rebalance_protocol(client_ptr) == "COOPERATIVE"
      Rdkafka::Bindings.rd_kafka_incremental_unassign(client_ptr, partitions_ptr)
    else
      Rdkafka::Bindings.rd_kafka_assign(client_ptr, FFI::Pointer::NULL)
    end
  end

  opaque = Rdkafka::Config.opaques[opaque_ptr.to_i]
  return unless opaque

  tpl = Rdkafka::Consumer::TopicPartitionList.from_native_tpl(partitions_ptr).freeze
  begin
    case code
    when RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
      opaque.call_on_partitions_assigned(tpl)
    when RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
      opaque.call_on_partitions_revoked(tpl)
    end
  rescue Exception => err
    Rdkafka::Config.logger.error("Unhandled exception: #{err.class} - #{err.message}")
  end
end
RD_KAFKA_VTYPE_END =

Producer

0
RD_KAFKA_VTYPE_TOPIC =
1
RD_KAFKA_VTYPE_RKT =
2
RD_KAFKA_VTYPE_PARTITION =
3
RD_KAFKA_VTYPE_VALUE =
4
RD_KAFKA_VTYPE_KEY =
5
RD_KAFKA_VTYPE_OPAQUE =
6
RD_KAFKA_VTYPE_MSGFLAGS =
7
RD_KAFKA_VTYPE_TIMESTAMP =
8
RD_KAFKA_VTYPE_HEADER =
9
RD_KAFKA_VTYPE_HEADERS =
10
RD_KAFKA_PURGE_F_QUEUE =
1
RD_KAFKA_PURGE_F_INFLIGHT =
2
RD_KAFKA_MSG_F_COPY =
0x2
PARTITIONERS =

Hash mapping partitioner names to their FFI function symbols

Returns:

  • (Hash{String => Symbol})
%w[random consistent consistent_random murmur2 murmur2_random fnv1a fnv1a_random].each_with_object({}) do |name, hsh|
  method_name = :"rd_kafka_msg_partitioner_#{name}"
  attach_function method_name, [:pointer, :pointer, :size_t, :int32, :pointer, :pointer], :int32
  hsh[name] = method_name
end
RD_KAFKA_ADMIN_OP_CREATETOPICS =

Create Topics

1
RD_KAFKA_EVENT_CREATETOPICS_RESULT =

rd_kafka_admin_op_t

100
RD_KAFKA_ADMIN_OP_DELETETOPICS =

Delete Topics

2
RD_KAFKA_EVENT_DELETETOPICS_RESULT =

rd_kafka_admin_op_t

101
RD_KAFKA_ADMIN_OP_CREATEPARTITIONS =

Create partitions

3
RD_KAFKA_ADMIN_OP_CREATEPARTITIONS_RESULT =
102
RD_KAFKA_ADMIN_OP_DELETEGROUPS =

Delete Group

7
RD_KAFKA_EVENT_DELETEGROUPS_RESULT =

rd_kafka_admin_op_t

106
RD_KAFKA_ADMIN_OP_CREATEACLS =

Create Acls

9
RD_KAFKA_EVENT_CREATEACLS_RESULT =
1024
RD_KAFKA_ADMIN_OP_DELETEACLS =

Delete Acls

11
RD_KAFKA_EVENT_DELETEACLS_RESULT =
4096
RD_KAFKA_ADMIN_OP_DESCRIBEACLS =

Describe Acls

10
RD_KAFKA_EVENT_DESCRIBEACLS_RESULT =
2048
RD_KAFKA_RESOURCE_ANY =
1
RD_KAFKA_RESOURCE_TOPIC =
2
RD_KAFKA_RESOURCE_GROUP =
3
RD_KAFKA_RESOURCE_BROKER =
4
RD_KAFKA_RESOURCE_TRANSACTIONAL_ID =
5
RD_KAFKA_RESOURCE_PATTERN_UNKNOWN =
0
RD_KAFKA_RESOURCE_PATTERN_ANY =
1
RD_KAFKA_RESOURCE_PATTERN_MATCH =
2
RD_KAFKA_RESOURCE_PATTERN_LITERAL =
3
RD_KAFKA_RESOURCE_PATTERN_PREFIXED =
4
RD_KAFKA_ACL_OPERATION_ANY =
1
RD_KAFKA_ACL_OPERATION_ALL =
2
RD_KAFKA_ACL_OPERATION_READ =
3
RD_KAFKA_ACL_OPERATION_WRITE =
4
RD_KAFKA_ACL_OPERATION_CREATE =
5
RD_KAFKA_ACL_OPERATION_DELETE =
6
RD_KAFKA_ACL_OPERATION_ALTER =
7
RD_KAFKA_ACL_OPERATION_DESCRIBE =
8
RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION =
9
RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS =
10
RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS =
11
RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE =
12
RD_KAFKA_ACL_PERMISSION_TYPE_ANY =
1
RD_KAFKA_ACL_PERMISSION_TYPE_DENY =
2
RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW =
3

Class Method Summary collapse

Class Method Details

.lib_extensionString

Returns the library extension based on the host OS

Returns:

  • (String)

    ‘dylib’ on macOS, ‘so’ on other systems



19
20
21
22
23
24
25
# File 'lib/rdkafka/bindings.rb', line 19

def self.lib_extension
  if /darwin/.match?(RbConfig::CONFIG["host_os"])
    "dylib"
  else
    "so"
  end
end

.partitioner(topic_ptr, str, partition_count, partitioner = "consistent_random") ⇒ Integer

Calculates the partition for a message based on the partitioner

Parameters:

  • topic_ptr (FFI::Pointer)

    pointer to the topic handle

  • str (String)

    the partition key string

  • partition_count (Integer, nil)

    number of partitions

  • partitioner (String) (defaults to: "consistent_random")

    name of the partitioner to use

Returns:

  • (Integer)

    partition number or RD_KAFKA_PARTITION_UA if unassigned

Raises:



487
488
489
490
491
492
493
494
495
496
497
# File 'lib/rdkafka/bindings.rb', line 487

def self.partitioner(topic_ptr, str, partition_count, partitioner = "consistent_random")
  # Return RD_KAFKA_PARTITION_UA(unassigned partition) when partition count is nil/zero.
  return RD_KAFKA_PARTITION_UA unless partition_count&.nonzero?

  str_ptr = str.empty? ? FFI::MemoryPointer::NULL : FFI::MemoryPointer.from_string(str)
  method_name = PARTITIONERS.fetch(partitioner) do
    raise Rdkafka::Config::ConfigError.new("Unknown partitioner: #{partitioner}")
  end

  public_send(method_name, topic_ptr, str_ptr, str.size, partition_count, nil, nil)
end