Module: Rdkafka::Bindings
- Extended by:
- FFI::Library
- Defined in:
- lib/rdkafka/bindings.rb
Overview
There are two types of responses related to errors:
- rd_kafka_error_t - a C object that we need to remap into an error or null when no error
- rd_kafka_resp_err_t - response error code (numeric) that we can use directly
It is critical to ensure, that we handle them correctly. The result type should be:
- rd_kafka_error_t - :pointer
- rd_kafka_resp_err_t - :int
Defined Under Namespace
Classes: ConfigResource, Message, NativeError, NativeErrorDesc, SizePtr, TopicPartition, TopicPartitionList
Constant Summary collapse
- RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS =
-175
- RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS =
-174
- RD_KAFKA_RESP_ERR__STATE =
-172
- RD_KAFKA_RESP_ERR__NOENT =
-156
- RD_KAFKA_RESP_ERR_NO_ERROR =
0- RD_KAFKA_OFFSET_END =
-1
- RD_KAFKA_OFFSET_BEGINNING =
-2
- RD_KAFKA_OFFSET_STORED =
-1000
- RD_KAFKA_OFFSET_INVALID =
-1001
- RD_KAFKA_PARTITION_UA =
-1
- RD_KAFKA_PARTITION_UA_STR =
RD_KAFKA_PARTITION_UA.to_s.freeze
- EMPTY_HASH =
{}.freeze
- RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS =
5- RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT =
104- RD_KAFKA_ADMIN_OP_INCREMENTALALTERCONFIGS =
16- RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT =
131072- RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET =
0- RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE =
1- RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND =
2- RD_KAFKA_ALTER_CONFIG_OP_TYPE_SUBTRACT =
3- RD_KAFKA_ADMIN_OP_LISTOFFSETS =
List Offsets
20- RD_KAFKA_EVENT_LISTOFFSETS_RESULT =
0x400000- RD_KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED =
rd_kafka_IsolationLevel_t
0- RD_KAFKA_ISOLATION_LEVEL_READ_COMMITTED =
1- RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP =
rd_kafka_OffsetSpec_t
-3
- RD_KAFKA_OFFSET_SPEC_EARLIEST =
-2
- RD_KAFKA_OFFSET_SPEC_LATEST =
-1
- LogCallback =
FFI::Function.new( :void, [:pointer, :int, :string, :string] ) do |_client_ptr, level, _level_string, line| severity = case level when 0, 1, 2 Logger::FATAL when 3 Logger::ERROR when 4 Logger::WARN when 5, 6 Logger::INFO when 7 Logger::DEBUG else Logger::UNKNOWN end Rdkafka::Config.ensure_log_thread Rdkafka::Config.log_queue << [severity, "rdkafka: #{line}"] end
- StatsCallback =
FFI::Function.new( :int, [:pointer, :string, :int, :pointer] ) do |_client_ptr, json, _json_len, _opaque| if Rdkafka::Config.statistics_callback stats = JSON.parse(json) # If user requested statistics callbacks, we can use the statistics data to get the # partitions count for each topic when this data is published. That way we do not have # to query this information when user is using `partition_key`. This takes around 0.02ms # every statistics interval period (most likely every 5 seconds) and saves us from making # any queries to the cluster for the partition count. # # One edge case is if user would set the `statistics.interval.ms` much higher than the # default current partition count refresh (30 seconds). This is taken care of as the lack # of reporting to the partitions cache will cause cache expire and blocking refresh. # # If user sets `topic.metadata.refresh.interval.ms` too high this is on the user. # # Since this cache is shared, having few consumers and/or producers in one process will # automatically improve the querying times even with low refresh times. (stats["topics"] || EMPTY_HASH).each do |topic_name, details| partitions_count = details["partitions"].keys.count { |k| !(k == RD_KAFKA_PARTITION_UA_STR) } next unless partitions_count.positive? Rdkafka::Producer.partitions_count_cache.set(topic_name, partitions_count) end Rdkafka::Config.statistics_callback.call(stats) end # Return 0 so librdkafka frees the json string RD_KAFKA_RESP_ERR_NO_ERROR end
- ErrorCallback =
FFI::Function.new( :void, [:pointer, :int, :string, :pointer] ) do |client_ptr, err_code, reason, _opaque| if Rdkafka::Config.error_callback instance_name = client_ptr.null? ? nil : Rdkafka::Bindings.rd_kafka_name(client_ptr) error = Rdkafka::RdkafkaError.new(err_code, broker_message: reason, instance_name: instance_name) error.set_backtrace(caller) Rdkafka::Config.error_callback.call(error) end end
- OAuthbearerTokenRefreshCallback =
The OAuth callback is currently global and contextless. This means that the callback will be called for all instances, and the callback must be able to determine to which instance it is associated. The instance name will be provided in the callback, allowing the callback to reference the correct instance.
An example of how to use the instance name in the callback is given below. The ‘refresh_token` is configured as the `oauthbearer_token_refresh_callback`. `instances` is a map of client names to client instances, maintained by the user.
“‘
def refresh_token(config, client_name) client = instances[client_name] client.oauthbearer_set_token( token: 'new-token-value', lifetime_ms: token-lifetime-ms, principal_name: 'principal-name' ) end“‘
FFI::Function.new( :void, [:pointer, :string, :pointer] ) do |client_ptr, config, _opaque| Rdkafka::Config.oauthbearer_token_refresh_callback&.call(config, Rdkafka::Bindings.rd_kafka_name(client_ptr)) end
- RebalanceCallback =
FFI::Function.new( :void, [:pointer, :int, :pointer, :pointer] ) do |client_ptr, code, partitions_ptr, opaque_ptr| case code when RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS if Rdkafka::Bindings.rd_kafka_rebalance_protocol(client_ptr) == "COOPERATIVE" Rdkafka::Bindings.rd_kafka_incremental_assign(client_ptr, partitions_ptr) else Rdkafka::Bindings.rd_kafka_assign(client_ptr, partitions_ptr) end else # RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS or errors if Rdkafka::Bindings.rd_kafka_rebalance_protocol(client_ptr) == "COOPERATIVE" Rdkafka::Bindings.rd_kafka_incremental_unassign(client_ptr, partitions_ptr) else Rdkafka::Bindings.rd_kafka_assign(client_ptr, FFI::Pointer::NULL) end end opaque = Rdkafka::Config.opaques[opaque_ptr.to_i] return unless opaque tpl = Rdkafka::Consumer::TopicPartitionList.from_native_tpl(partitions_ptr).freeze begin case code when RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS opaque.call_on_partitions_assigned(tpl) when RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS opaque.call_on_partitions_revoked(tpl) end rescue Exception => err Rdkafka::Config.logger.error("Unhandled exception: #{err.class} - #{err.}") end end
- RD_KAFKA_VTYPE_END =
Producer
0- RD_KAFKA_VTYPE_TOPIC =
1- RD_KAFKA_VTYPE_RKT =
2- RD_KAFKA_VTYPE_PARTITION =
3- RD_KAFKA_VTYPE_VALUE =
4- RD_KAFKA_VTYPE_KEY =
5- RD_KAFKA_VTYPE_OPAQUE =
6- RD_KAFKA_VTYPE_MSGFLAGS =
7- RD_KAFKA_VTYPE_TIMESTAMP =
8- RD_KAFKA_VTYPE_HEADER =
9- RD_KAFKA_VTYPE_HEADERS =
10- RD_KAFKA_PURGE_F_QUEUE =
1- RD_KAFKA_PURGE_F_INFLIGHT =
2- RD_KAFKA_MSG_F_COPY =
0x2- PARTITIONERS =
Hash mapping partitioner names to their FFI function symbols
%w[random consistent consistent_random murmur2 murmur2_random fnv1a fnv1a_random].each_with_object({}) do |name, hsh| method_name = :"rd_kafka_msg_partitioner_#{name}" attach_function method_name, [:pointer, :pointer, :size_t, :int32, :pointer, :pointer], :int32 hsh[name] = method_name end
- RD_KAFKA_ADMIN_OP_CREATETOPICS =
Create Topics
1- RD_KAFKA_EVENT_CREATETOPICS_RESULT =
rd_kafka_admin_op_t
100- RD_KAFKA_ADMIN_OP_DELETETOPICS =
Delete Topics
2- RD_KAFKA_EVENT_DELETETOPICS_RESULT =
rd_kafka_admin_op_t
101- RD_KAFKA_ADMIN_OP_CREATEPARTITIONS =
Create partitions
3- RD_KAFKA_ADMIN_OP_CREATEPARTITIONS_RESULT =
102- RD_KAFKA_ADMIN_OP_DELETEGROUPS =
Delete Group
7- RD_KAFKA_EVENT_DELETEGROUPS_RESULT =
rd_kafka_admin_op_t
106- RD_KAFKA_ADMIN_OP_CREATEACLS =
Create Acls
9- RD_KAFKA_EVENT_CREATEACLS_RESULT =
1024- RD_KAFKA_ADMIN_OP_DELETEACLS =
Delete Acls
11- RD_KAFKA_EVENT_DELETEACLS_RESULT =
4096- RD_KAFKA_ADMIN_OP_DESCRIBEACLS =
Describe Acls
10- RD_KAFKA_EVENT_DESCRIBEACLS_RESULT =
2048- RD_KAFKA_RESOURCE_ANY =
rd_kafka_ResourceType_t - github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7307
1- RD_KAFKA_RESOURCE_TOPIC =
2- RD_KAFKA_RESOURCE_GROUP =
3- RD_KAFKA_RESOURCE_BROKER =
4- RD_KAFKA_RESOURCE_TRANSACTIONAL_ID =
5- RD_KAFKA_RESOURCE_PATTERN_UNKNOWN =
rd_kafka_ResourcePatternType_t - github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7320
0- RD_KAFKA_RESOURCE_PATTERN_ANY =
1- RD_KAFKA_RESOURCE_PATTERN_MATCH =
2- RD_KAFKA_RESOURCE_PATTERN_LITERAL =
3- RD_KAFKA_RESOURCE_PATTERN_PREFIXED =
4- RD_KAFKA_ACL_OPERATION_ANY =
rd_kafka_AclOperation_t - github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L8403
1- RD_KAFKA_ACL_OPERATION_ALL =
2- RD_KAFKA_ACL_OPERATION_READ =
3- RD_KAFKA_ACL_OPERATION_WRITE =
4- RD_KAFKA_ACL_OPERATION_CREATE =
5- RD_KAFKA_ACL_OPERATION_DELETE =
6- RD_KAFKA_ACL_OPERATION_ALTER =
7- RD_KAFKA_ACL_OPERATION_DESCRIBE =
8- RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION =
9- RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS =
10- RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS =
11- RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE =
12- RD_KAFKA_ACL_PERMISSION_TYPE_ANY =
rd_kafka_AclPermissionType_t - github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L8435
1- RD_KAFKA_ACL_PERMISSION_TYPE_DENY =
2- RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW =
3
Class Method Summary collapse
-
.lib_extension ⇒ String
Returns the library extension based on the host OS.
-
.partitioner(topic_ptr, str, partition_count, partitioner = "consistent_random") ⇒ Integer
Calculates the partition for a message based on the partitioner.
Class Method Details
.lib_extension ⇒ String
Returns the library extension based on the host OS
19 20 21 22 23 24 25 |
# File 'lib/rdkafka/bindings.rb', line 19 def self.lib_extension if /darwin/.match?(RbConfig::CONFIG["host_os"]) "dylib" else "so" end end |
.partitioner(topic_ptr, str, partition_count, partitioner = "consistent_random") ⇒ Integer
Calculates the partition for a message based on the partitioner
487 488 489 490 491 492 493 494 495 496 497 |
# File 'lib/rdkafka/bindings.rb', line 487 def self.partitioner(topic_ptr, str, partition_count, partitioner = "consistent_random") # Return RD_KAFKA_PARTITION_UA(unassigned partition) when partition count is nil/zero. return RD_KAFKA_PARTITION_UA unless partition_count&.nonzero? str_ptr = str.empty? ? FFI::MemoryPointer::NULL : FFI::MemoryPointer.from_string(str) method_name = PARTITIONERS.fetch(partitioner) do raise Rdkafka::Config::ConfigError.new("Unknown partitioner: #{partitioner}") end public_send(method_name, topic_ptr, str_ptr, str.size, partition_count, nil, nil) end |