Module: Rdkafka::Bindings
- Extended by:
- FFI::Library
- Defined in:
- lib/rdkafka/bindings.rb
Overview
There are two types of responses related to errors:
- rd_kafka_error_t - a C object that we need to remap into an error or null when no error
- rd_kafka_resp_err_t - response error code (numeric) that we can use directly
It is critical to ensure, that we handle them correctly. The result type should be:
- rd_kafka_error_t - :pointer
- rd_kafka_resp_err_t - :int
Defined Under Namespace
Classes: ConfigResource, Message, NativeError, NativeErrorDesc, SizePtr, TopicPartition, TopicPartitionList
Constant Summary collapse
- RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS =
-175
- RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS =
-174
- RD_KAFKA_RESP_ERR__STATE =
-172
- RD_KAFKA_RESP_ERR__NOENT =
-156
- RD_KAFKA_RESP_ERR__FATAL =
-150
- RD_KAFKA_RESP_ERR_NO_ERROR =
0- FATAL_ERROR_BUFFER_SIZE =
Buffer size for fatal error strings, matches librdkafka expectations
256- RD_KAFKA_PARTITION_UA =
Unassigned partition
-1 # String representation of unassigned partition (used in stats hash keys)
- RD_KAFKA_PARTITION_UA_STR =
String representation of unassigned partition (used in stats hash keys)
RD_KAFKA_PARTITION_UA.to_s.freeze
- RD_KAFKA_OFFSET_END =
-1
- RD_KAFKA_OFFSET_BEGINNING =
-2
- RD_KAFKA_OFFSET_STORED =
-1000
- RD_KAFKA_OFFSET_INVALID =
-1001
- EMPTY_HASH =
{}.freeze
- RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS =
5- RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT =
104- RD_KAFKA_ADMIN_OP_INCREMENTALALTERCONFIGS =
16- RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT =
131072- RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET =
0- RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE =
1- RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND =
2- RD_KAFKA_ALTER_CONFIG_OP_TYPE_SUBTRACT =
3- LogCallback =
FFI::Function.new( :void, [:pointer, :int, :string, :string] ) do |_client_ptr, level, _level_string, line| severity = case level when 0, 1, 2 Logger::FATAL when 3 Logger::ERROR when 4 Logger::WARN when 5, 6 Logger::INFO when 7 Logger::DEBUG else Logger::UNKNOWN end Rdkafka::Config.ensure_log_thread Rdkafka::Config.log_queue << [severity, "rdkafka: #{line}"] end
- StatsCallback =
FFI::Function.new( :int, [:pointer, :string, :int, :pointer] ) do |_client_ptr, json, _json_len, _opaque| if Rdkafka::Config.statistics_callback stats = JSON.parse(json) # If user requested statistics callbacks, we can use the statistics data to get the # partitions count for each topic when this data is published. That way we do not have # to query this information when user is using `partition_key`. This takes around 0.02ms # every statistics interval period (most likely every 5 seconds) and saves us from making # any queries to the cluster for the partition count. # # One edge case is if user would set the `statistics.interval.ms` much higher than the # default current partition count refresh (30 seconds). This is taken care of as the lack # of reporting to the partitions cache will cause cache expire and blocking refresh. # # If user sets `topic.metadata.refresh.interval.ms` too high this is on the user. # # Since this cache is shared, having few consumers and/or producers in one process will # automatically improve the querying times even with low refresh times. (stats['topics'] || EMPTY_HASH).each do |topic_name, details| partitions_count = details['partitions'].keys.reject { |k| k == RD_KAFKA_PARTITION_UA_STR }.size next unless partitions_count.positive? Rdkafka::Producer.partitions_count_cache.set(topic_name, partitions_count) end Rdkafka::Config.statistics_callback.call(stats) end # Return 0 so librdkafka frees the json string RD_KAFKA_RESP_ERR_NO_ERROR end
- ErrorCallback =
FFI::Function.new( :void, [:pointer, :int, :string, :pointer] ) do |client_ptr, err_code, reason, _opaque| if Rdkafka::Config.error_callback # Handle fatal errors according to librdkafka documentation: # When ERR__FATAL is received, we must call rd_kafka_fatal_error() # to get the actual underlying fatal error code and description. if err_code == RD_KAFKA_RESP_ERR__FATAL error = Rdkafka::RdkafkaError.build_fatal( client_ptr, fallback_error_code: err_code, fallback_message: reason ) else error = Rdkafka::RdkafkaError.build(err_code, broker_message: reason) end error.set_backtrace(caller) Rdkafka::Config.error_callback.call(error) end end
- OAuthbearerTokenRefreshCallback =
The OAuth callback is currently global and contextless. This means that the callback will be called for all instances, and the callback must be able to determine to which instance it is associated. The instance name will be provided in the callback, allowing the callback to reference the correct instance.
An example of how to use the instance name in the callback is given below. The ‘refresh_token` is configured as the `oauthbearer_token_refresh_callback`. `instances` is a map of client names to client instances, maintained by the user.
“‘
def refresh_token(config, client_name) client = instances[client_name] client.oauthbearer_set_token( token: 'new-token-value', lifetime_ms: token-lifetime-ms, principal_name: 'principal-name' ) end“‘
FFI::Function.new( :void, [:pointer, :string, :pointer] ) do |client_ptr, config, _opaque| if Rdkafka::Config.oauthbearer_token_refresh_callback && !client_ptr.null? Rdkafka::Config.oauthbearer_token_refresh_callback.call(config, Rdkafka::Bindings.rd_kafka_name(client_ptr)) end end
- RebalanceCallback =
FFI::Function.new( :void, [:pointer, :int, :pointer, :pointer] ) do |client_ptr, code, partitions_ptr, opaque_ptr| case code when RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS if Rdkafka::Bindings.rd_kafka_rebalance_protocol(client_ptr) == "COOPERATIVE" Rdkafka::Bindings.rd_kafka_incremental_assign(client_ptr, partitions_ptr) else Rdkafka::Bindings.rd_kafka_assign(client_ptr, partitions_ptr) end else # RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS or errors if Rdkafka::Bindings.rd_kafka_rebalance_protocol(client_ptr) == "COOPERATIVE" Rdkafka::Bindings.rd_kafka_incremental_unassign(client_ptr, partitions_ptr) else Rdkafka::Bindings.rd_kafka_assign(client_ptr, FFI::Pointer::NULL) end end opaque = Rdkafka::Config.opaques[opaque_ptr.to_i] return unless opaque tpl = Rdkafka::Consumer::TopicPartitionList.from_native_tpl(partitions_ptr).freeze begin case code when RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS opaque.call_on_partitions_assigned(tpl) when RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS opaque.call_on_partitions_revoked(tpl) end rescue Exception => err Rdkafka::Config.logger.error("Unhandled exception: #{err.class} - #{err.}") end end
- RD_KAFKA_VTYPE_END =
Producer
0- RD_KAFKA_VTYPE_TOPIC =
1- RD_KAFKA_VTYPE_RKT =
2- RD_KAFKA_VTYPE_PARTITION =
3- RD_KAFKA_VTYPE_VALUE =
4- RD_KAFKA_VTYPE_KEY =
5- RD_KAFKA_VTYPE_OPAQUE =
6- RD_KAFKA_VTYPE_MSGFLAGS =
7- RD_KAFKA_VTYPE_TIMESTAMP =
8- RD_KAFKA_VTYPE_HEADER =
9- RD_KAFKA_VTYPE_HEADERS =
10- RD_KAFKA_PURGE_F_QUEUE =
1- RD_KAFKA_PURGE_F_INFLIGHT =
2- RD_KAFKA_MSG_F_COPY =
0x2- PARTITIONERS =
Hash mapping partitioner names to their FFI function symbols
%w(random consistent consistent_random murmur2 murmur2_random fnv1a fnv1a_random).each_with_object({}) do |name, hsh| method_name = "rd_kafka_msg_partitioner_#{name}".to_sym attach_function method_name, [:pointer, :pointer, :size_t, :int32, :pointer, :pointer], :int32 hsh[name] = method_name end
- RD_KAFKA_ADMIN_OP_CREATETOPICS =
Create Topics
1- RD_KAFKA_EVENT_CREATETOPICS_RESULT =
rd_kafka_admin_op_t
100- RD_KAFKA_ADMIN_OP_DELETETOPICS =
Delete Topics
2- RD_KAFKA_EVENT_DELETETOPICS_RESULT =
rd_kafka_admin_op_t
101- RD_KAFKA_ADMIN_OP_CREATEPARTITIONS =
Create partitions
3- RD_KAFKA_ADMIN_OP_CREATEPARTITIONS_RESULT =
102- RD_KAFKA_ADMIN_OP_DELETEGROUPS =
Delete Group
7- RD_KAFKA_EVENT_DELETEGROUPS_RESULT =
rd_kafka_admin_op_t
106- RD_KAFKA_ADMIN_OP_CREATEACLS =
Create Acls
9- RD_KAFKA_EVENT_CREATEACLS_RESULT =
1024- RD_KAFKA_ADMIN_OP_DELETEACLS =
Delete Acls
11- RD_KAFKA_EVENT_DELETEACLS_RESULT =
4096- RD_KAFKA_ADMIN_OP_DESCRIBEACLS =
Describe Acls
10- RD_KAFKA_EVENT_DESCRIBEACLS_RESULT =
2048- RD_KAFKA_RESOURCE_ANY =
rd_kafka_ResourceType_t - github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7307
1- RD_KAFKA_RESOURCE_TOPIC =
2- RD_KAFKA_RESOURCE_GROUP =
3- RD_KAFKA_RESOURCE_BROKER =
4- RD_KAFKA_RESOURCE_TRANSACTIONAL_ID =
5- RD_KAFKA_RESOURCE_PATTERN_ANY =
rd_kafka_ResourcePatternType_t - github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7320
1- RD_KAFKA_RESOURCE_PATTERN_MATCH =
2- RD_KAFKA_RESOURCE_PATTERN_LITERAL =
3- RD_KAFKA_RESOURCE_PATTERN_PREFIXED =
4- RD_KAFKA_ACL_OPERATION_ANY =
rd_kafka_AclOperation_t - github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L8403
1- RD_KAFKA_ACL_OPERATION_ALL =
2- RD_KAFKA_ACL_OPERATION_READ =
3- RD_KAFKA_ACL_OPERATION_WRITE =
4- RD_KAFKA_ACL_OPERATION_CREATE =
5- RD_KAFKA_ACL_OPERATION_DELETE =
6- RD_KAFKA_ACL_OPERATION_ALTER =
7- RD_KAFKA_ACL_OPERATION_DESCRIBE =
8- RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION =
9- RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS =
10- RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS =
11- RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE =
12- RD_KAFKA_ACL_PERMISSION_TYPE_ANY =
rd_kafka_AclPermissionType_t - github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L8435
1- RD_KAFKA_ACL_PERMISSION_TYPE_DENY =
2- RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW =
3
Class Method Summary collapse
-
.extract_fatal_error(client_ptr) ⇒ Hash?
Retrieves fatal error details from a kafka client handle.
-
.lib_extension ⇒ String
Returns the library extension based on the host OS.
-
.partitioner(topic_ptr, str, partition_count, partitioner = "consistent_random") ⇒ Integer
Calculates the partition for a message based on the partitioner.
Class Method Details
.extract_fatal_error(client_ptr) ⇒ Hash?
Retrieves fatal error details from a kafka client handle. This is a helper method to extract fatal error information consistently across different parts of the codebase (callbacks, testing utilities, etc.).
306 307 308 309 310 311 312 313 314 315 316 317 |
# File 'lib/rdkafka/bindings.rb', line 306 def self.extract_fatal_error(client_ptr) error_buffer = FFI::MemoryPointer.new(:char, FATAL_ERROR_BUFFER_SIZE) error_code = rd_kafka_fatal_error(client_ptr, error_buffer, FATAL_ERROR_BUFFER_SIZE) return nil if error_code == RD_KAFKA_RESP_ERR_NO_ERROR { error_code: error_code, error_string: error_buffer.read_string } end |
.lib_extension ⇒ String
Returns the library extension based on the host OS
19 20 21 22 23 24 25 |
# File 'lib/rdkafka/bindings.rb', line 19 def self.lib_extension if RbConfig::CONFIG['host_os'] =~ /darwin/ 'dylib' else 'so' end end |
.partitioner(topic_ptr, str, partition_count, partitioner = "consistent_random") ⇒ Integer
Calculates the partition for a message based on the partitioner
493 494 495 496 497 498 499 500 501 502 503 |
# File 'lib/rdkafka/bindings.rb', line 493 def self.partitioner(topic_ptr, str, partition_count, partitioner = "consistent_random") # Return RD_KAFKA_PARTITION_UA(unassigned partition) when partition count is nil/zero. return RD_KAFKA_PARTITION_UA unless partition_count&.nonzero? str_ptr = str.empty? ? FFI::MemoryPointer::NULL : FFI::MemoryPointer.from_string(str) method_name = PARTITIONERS.fetch(partitioner) do raise Rdkafka::Config::ConfigError.new("Unknown partitioner: #{partitioner}") end public_send(method_name, topic_ptr, str_ptr, str.size, partition_count, nil, nil) end |