Module: Rdkafka::Bindings
- Extended by:
- FFI::Library
- Defined in:
- lib/rdkafka/bindings.rb
Overview
There are two types of responses related to errors:
- rd_kafka_error_t - a C object that we need to remap into an error or null when no error
- rd_kafka_resp_err_t - response error code (numeric) that we can use directly
It is critical to ensure, that we handle them correctly. The result type should be:
- rd_kafka_error_t - :pointer
- rd_kafka_resp_err_t - :int
Defined Under Namespace
Classes: ConfigResource, Message, NativeError, NativeErrorDesc, SizePtr, TopicPartition, TopicPartitionList
Constant Summary collapse
- RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS =
-175
- RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS =
-174
- RD_KAFKA_RESP_ERR__STATE =
-172
- RD_KAFKA_RESP_ERR__NOENT =
-156
- RD_KAFKA_RESP_ERR__FATAL =
-150
- RD_KAFKA_RESP_ERR_NO_ERROR =
0- FATAL_ERROR_BUFFER_SIZE =
Buffer size for fatal error strings, matches librdkafka expectations
256- RD_KAFKA_PARTITION_UA =
Unassigned partition
-1 # String representation of unassigned partition (used in stats hash keys)
- RD_KAFKA_PARTITION_UA_STR =
String representation of unassigned partition (used in stats hash keys)
RD_KAFKA_PARTITION_UA.to_s.freeze
- RD_KAFKA_OFFSET_END =
-1
- RD_KAFKA_OFFSET_BEGINNING =
-2
- RD_KAFKA_OFFSET_STORED =
-1000
- RD_KAFKA_OFFSET_INVALID =
-1001
- EMPTY_HASH =
{}.freeze
- RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS =
5- RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT =
104- RD_KAFKA_ADMIN_OP_INCREMENTALALTERCONFIGS =
16- RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT =
131072- RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET =
0- RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE =
1- RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND =
2- RD_KAFKA_ALTER_CONFIG_OP_TYPE_SUBTRACT =
3- RD_KAFKA_ADMIN_OP_LISTOFFSETS =
List Offsets
20- RD_KAFKA_EVENT_LISTOFFSETS_RESULT =
0x400000- RD_KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED =
rd_kafka_IsolationLevel_t
0- RD_KAFKA_ISOLATION_LEVEL_READ_COMMITTED =
1- RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP =
rd_kafka_OffsetSpec_t
-3
- RD_KAFKA_OFFSET_SPEC_EARLIEST =
-2
- RD_KAFKA_OFFSET_SPEC_LATEST =
-1
- LogCallback =
FFI::Function.new( :void, [:pointer, :int, :string, :string] ) do |_client_ptr, level, _level_string, line| severity = case level when 0, 1, 2 Logger::FATAL when 3 Logger::ERROR when 4 Logger::WARN when 5, 6 Logger::INFO when 7 Logger::DEBUG else Logger::UNKNOWN end Rdkafka::Config.ensure_log_thread Rdkafka::Config.log_queue << [severity, "rdkafka: #{line}"] end
- StatsCallback =
FFI::Function.new( :int, [:pointer, :string, :int, :pointer] ) do |_client_ptr, json, _json_len, _opaque| if Rdkafka::Config.statistics_callback stats = JSON.parse(json) # If user requested statistics callbacks, we can use the statistics data to get the # partitions count for each topic when this data is published. That way we do not have # to query this information when user is using `partition_key`. This takes around 0.02ms # every statistics interval period (most likely every 5 seconds) and saves us from making # any queries to the cluster for the partition count. # # One edge case is if user would set the `statistics.interval.ms` much higher than the # default current partition count refresh (30 seconds). This is taken care of as the lack # of reporting to the partitions cache will cause cache expire and blocking refresh. # # If user sets `topic.metadata.refresh.interval.ms` too high this is on the user. # # Since this cache is shared, having few consumers and/or producers in one process will # automatically improve the querying times even with low refresh times. (stats["topics"] || EMPTY_HASH).each do |topic_name, details| partitions_count = details["partitions"].keys.count { |k| !(k == RD_KAFKA_PARTITION_UA_STR) } next unless partitions_count.positive? Rdkafka::Producer.partitions_count_cache.set(topic_name, partitions_count) end Rdkafka::Config.statistics_callback.call(stats) end # Return 0 so librdkafka frees the json string RD_KAFKA_RESP_ERR_NO_ERROR end
- ErrorCallback =
FFI::Function.new( :void, [:pointer, :int, :string, :pointer] ) do |client_ptr, err_code, reason, _opaque| if Rdkafka::Config.error_callback instance_name = client_ptr.null? ? nil : Rdkafka::Bindings.rd_kafka_name(client_ptr) # Handle fatal errors according to librdkafka documentation: # When ERR__FATAL is received, we must call rd_kafka_fatal_error() # to get the actual underlying fatal error code and description. error = if err_code == RD_KAFKA_RESP_ERR__FATAL Rdkafka::RdkafkaError.build_fatal( client_ptr, fallback_error_code: err_code, fallback_message: reason, instance_name: instance_name ) else Rdkafka::RdkafkaError.build(err_code, broker_message: reason, instance_name: instance_name) end error.set_backtrace(caller) Rdkafka::Config.error_callback.call(error) end end
- OAuthbearerTokenRefreshCallback =
The OAuth callback is currently global and contextless. This means that the callback will be called for all instances, and the callback must be able to determine to which instance it is associated. The instance name will be provided in the callback, allowing the callback to reference the correct instance.
An example of how to use the instance name in the callback is given below. The ‘refresh_token` is configured as the `oauthbearer_token_refresh_callback`. `instances` is a map of client names to client instances, maintained by the user.
“‘
def refresh_token(config, client_name) client = instances[client_name] client.oauthbearer_set_token( token: 'new-token-value', lifetime_ms: token-lifetime-ms, principal_name: 'principal-name' ) end“‘
FFI::Function.new( :void, [:pointer, :string, :pointer] ) do |client_ptr, config, _opaque| if Rdkafka::Config.oauthbearer_token_refresh_callback && !client_ptr.null? Rdkafka::Config.oauthbearer_token_refresh_callback.call(config, Rdkafka::Bindings.rd_kafka_name(client_ptr)) end end
- RebalanceCallback =
FFI::Function.new( :void, [:pointer, :int, :pointer, :pointer] ) do |client_ptr, code, partitions_ptr, opaque_ptr| case code when RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS if Rdkafka::Bindings.rd_kafka_rebalance_protocol(client_ptr) == "COOPERATIVE" Rdkafka::Bindings.rd_kafka_incremental_assign(client_ptr, partitions_ptr) else Rdkafka::Bindings.rd_kafka_assign(client_ptr, partitions_ptr) end else # RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS or errors if Rdkafka::Bindings.rd_kafka_rebalance_protocol(client_ptr) == "COOPERATIVE" Rdkafka::Bindings.rd_kafka_incremental_unassign(client_ptr, partitions_ptr) else Rdkafka::Bindings.rd_kafka_assign(client_ptr, FFI::Pointer::NULL) end end opaque = Rdkafka::Config.opaques[opaque_ptr.to_i] return unless opaque tpl = Rdkafka::Consumer::TopicPartitionList.from_native_tpl(partitions_ptr).freeze begin case code when RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS opaque.call_on_partitions_assigned(tpl) when RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS opaque.call_on_partitions_revoked(tpl) end rescue Exception => err Rdkafka::Config.logger.error("Unhandled exception: #{err.class} - #{err.}") end end
- RD_KAFKA_VTYPE_END =
Producer
0- RD_KAFKA_VTYPE_TOPIC =
1- RD_KAFKA_VTYPE_RKT =
2- RD_KAFKA_VTYPE_PARTITION =
3- RD_KAFKA_VTYPE_VALUE =
4- RD_KAFKA_VTYPE_KEY =
5- RD_KAFKA_VTYPE_OPAQUE =
6- RD_KAFKA_VTYPE_MSGFLAGS =
7- RD_KAFKA_VTYPE_TIMESTAMP =
8- RD_KAFKA_VTYPE_HEADER =
9- RD_KAFKA_VTYPE_HEADERS =
10- RD_KAFKA_PURGE_F_QUEUE =
1- RD_KAFKA_PURGE_F_INFLIGHT =
2- RD_KAFKA_MSG_F_COPY =
0x2- PARTITIONERS =
Hash mapping partitioner names to their FFI function symbols
%w[random consistent consistent_random murmur2 murmur2_random fnv1a fnv1a_random].each_with_object({}) do |name, hsh| method_name = :"rd_kafka_msg_partitioner_#{name}" attach_function method_name, [:pointer, :pointer, :size_t, :int32, :pointer, :pointer], :int32 hsh[name] = method_name end
- RD_KAFKA_ADMIN_OP_CREATETOPICS =
Create Topics
1- RD_KAFKA_EVENT_CREATETOPICS_RESULT =
rd_kafka_admin_op_t
100- RD_KAFKA_ADMIN_OP_DELETETOPICS =
Delete Topics
2- RD_KAFKA_EVENT_DELETETOPICS_RESULT =
rd_kafka_admin_op_t
101- RD_KAFKA_ADMIN_OP_CREATEPARTITIONS =
Create partitions
3- RD_KAFKA_ADMIN_OP_CREATEPARTITIONS_RESULT =
102- RD_KAFKA_ADMIN_OP_DELETEGROUPS =
Delete Group
7- RD_KAFKA_EVENT_DELETEGROUPS_RESULT =
rd_kafka_admin_op_t
106- RD_KAFKA_ADMIN_OP_CREATEACLS =
Create Acls
9- RD_KAFKA_EVENT_CREATEACLS_RESULT =
1024- RD_KAFKA_ADMIN_OP_DELETEACLS =
Delete Acls
11- RD_KAFKA_EVENT_DELETEACLS_RESULT =
4096- RD_KAFKA_ADMIN_OP_DESCRIBEACLS =
Describe Acls
10- RD_KAFKA_EVENT_DESCRIBEACLS_RESULT =
2048- RD_KAFKA_RESOURCE_ANY =
rd_kafka_ResourceType_t - github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7307
1- RD_KAFKA_RESOURCE_TOPIC =
2- RD_KAFKA_RESOURCE_GROUP =
3- RD_KAFKA_RESOURCE_BROKER =
4- RD_KAFKA_RESOURCE_TRANSACTIONAL_ID =
5- RD_KAFKA_RESOURCE_PATTERN_UNKNOWN =
rd_kafka_ResourcePatternType_t - github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7320
0- RD_KAFKA_RESOURCE_PATTERN_ANY =
1- RD_KAFKA_RESOURCE_PATTERN_MATCH =
2- RD_KAFKA_RESOURCE_PATTERN_LITERAL =
3- RD_KAFKA_RESOURCE_PATTERN_PREFIXED =
4- RD_KAFKA_ACL_OPERATION_ANY =
rd_kafka_AclOperation_t - github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L8403
1- RD_KAFKA_ACL_OPERATION_ALL =
2- RD_KAFKA_ACL_OPERATION_READ =
3- RD_KAFKA_ACL_OPERATION_WRITE =
4- RD_KAFKA_ACL_OPERATION_CREATE =
5- RD_KAFKA_ACL_OPERATION_DELETE =
6- RD_KAFKA_ACL_OPERATION_ALTER =
7- RD_KAFKA_ACL_OPERATION_DESCRIBE =
8- RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION =
9- RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS =
10- RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS =
11- RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE =
12- RD_KAFKA_ACL_PERMISSION_TYPE_ANY =
rd_kafka_AclPermissionType_t - github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L8435
1- RD_KAFKA_ACL_PERMISSION_TYPE_DENY =
2- RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW =
3
Class Method Summary collapse
-
.extract_fatal_error(client_ptr) ⇒ Hash?
Retrieves fatal error details from a kafka client handle.
-
.lib_extension ⇒ String
Returns the library extension based on the host OS.
-
.partitioner(topic_ptr, str, partition_count, partitioner = "consistent_random") ⇒ Integer
Calculates the partition for a message based on the partitioner.
Class Method Details
.extract_fatal_error(client_ptr) ⇒ Hash?
Retrieves fatal error details from a kafka client handle. This is a helper method to extract fatal error information consistently across different parts of the codebase (callbacks, testing utilities, etc.).
345 346 347 348 349 350 351 352 353 354 355 356 |
# File 'lib/rdkafka/bindings.rb', line 345 def self.extract_fatal_error(client_ptr) error_buffer = FFI::MemoryPointer.new(:char, FATAL_ERROR_BUFFER_SIZE) error_code = rd_kafka_fatal_error(client_ptr, error_buffer, FATAL_ERROR_BUFFER_SIZE) return nil if error_code == RD_KAFKA_RESP_ERR_NO_ERROR { error_code: error_code, error_string: error_buffer.read_string } end |
.lib_extension ⇒ String
Returns the library extension based on the host OS
19 20 21 22 23 24 25 |
# File 'lib/rdkafka/bindings.rb', line 19 def self.lib_extension if /darwin/.match?(RbConfig::CONFIG["host_os"]) "dylib" else "so" end end |
.partitioner(topic_ptr, str, partition_count, partitioner = "consistent_random") ⇒ Integer
Calculates the partition for a message based on the partitioner
541 542 543 544 545 546 547 548 549 550 551 |
# File 'lib/rdkafka/bindings.rb', line 541 def self.partitioner(topic_ptr, str, partition_count, partitioner = "consistent_random") # Return RD_KAFKA_PARTITION_UA(unassigned partition) when partition count is nil/zero. return RD_KAFKA_PARTITION_UA unless partition_count&.nonzero? str_ptr = str.empty? ? FFI::MemoryPointer::NULL : FFI::MemoryPointer.from_string(str) method_name = PARTITIONERS.fetch(partitioner) do raise Rdkafka::Config::ConfigError.new("Unknown partitioner: #{partitioner}") end public_send(method_name, topic_ptr, str_ptr, str.size, partition_count, nil, nil) end |