Class: Rdkafka::Callbacks::BackgroundEventCallback
- Inherits:
-
Object
- Object
- Rdkafka::Callbacks::BackgroundEventCallback
- Defined in:
- lib/rdkafka/callbacks.rb
Class Method Summary collapse
-
.call(_client_ptr, event_ptr, _opaque_ptr) ⇒ Object
Handles background events from librdkafka.
-
.process_create_acl(event_ptr) ⇒ Object
Processes create ACL result event.
-
.process_create_partitions(event_ptr) ⇒ Object
Processes create partitions result event.
-
.process_create_topic(event_ptr) ⇒ Object
Processes create topic result event.
-
.process_delete_acl(event_ptr) ⇒ Object
Processes delete ACL result event.
-
.process_delete_groups(event_ptr) ⇒ Object
Processes delete groups result event.
-
.process_delete_topic(event_ptr) ⇒ Object
Processes delete topic result event.
-
.process_describe_acl(event_ptr) ⇒ Object
Processes describe ACL result event.
-
.process_describe_configs(event_ptr) ⇒ Object
Processes describe configs result event.
-
.process_incremental_alter_configs(event_ptr) ⇒ Object
Processes incremental alter configs result event.
-
.process_list_offsets(event_ptr) ⇒ Object
Processes list offsets result event.
Class Method Details
.call(_client_ptr, event_ptr, _opaque_ptr) ⇒ Object
Handles background events from librdkafka
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/rdkafka/callbacks.rb', line 210 def self.call(_client_ptr, event_ptr, _opaque_ptr) case Rdkafka::Bindings.rd_kafka_event_type(event_ptr) when Rdkafka::Bindings::RD_KAFKA_EVENT_CREATETOPICS_RESULT process_create_topic(event_ptr) when Rdkafka::Bindings::RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT process_describe_configs(event_ptr) when Rdkafka::Bindings::RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT process_incremental_alter_configs(event_ptr) when Rdkafka::Bindings::RD_KAFKA_EVENT_DELETETOPICS_RESULT process_delete_topic(event_ptr) when Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_CREATEPARTITIONS_RESULT process_create_partitions(event_ptr) when Rdkafka::Bindings::RD_KAFKA_EVENT_CREATEACLS_RESULT process_create_acl(event_ptr) when Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEACLS_RESULT process_delete_acl(event_ptr) when Rdkafka::Bindings::RD_KAFKA_EVENT_DESCRIBEACLS_RESULT process_describe_acl(event_ptr) when Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEGROUPS_RESULT process_delete_groups(event_ptr) when Rdkafka::Bindings::RD_KAFKA_EVENT_LISTOFFSETS_RESULT process_list_offsets(event_ptr) end end |
.process_create_acl(event_ptr) ⇒ Object
Processes create ACL result event
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 |
# File 'lib/rdkafka/callbacks.rb', line 359 def self.process_create_acl(event_ptr) create_acls_result = Rdkafka::Bindings.rd_kafka_event_CreateAcls_result(event_ptr) # Get the number of acl results pointer_to_size_t = FFI::MemoryPointer.new(:int32) create_acl_result_array = Rdkafka::Bindings.rd_kafka_CreateAcls_result_acls(create_acls_result, pointer_to_size_t) create_acl_results = CreateAclResult.create_acl_results_from_array(pointer_to_size_t.read_int, create_acl_result_array) create_acl_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr) if create_acl_handle = Rdkafka::Admin::CreateAclHandle.remove(create_acl_handle_ptr.address) create_acl_handle[:response] = create_acl_results[0].result_error create_acl_handle[:response_string] = create_acl_results[0].error_string create_acl_handle.unlock end end |
.process_create_partitions(event_ptr) ⇒ Object
Processes create partitions result event
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 |
# File 'lib/rdkafka/callbacks.rb', line 339 def self.process_create_partitions(event_ptr) create_partitionss_result = Rdkafka::Bindings.rd_kafka_event_CreatePartitions_result(event_ptr) # Get the number of create topic results pointer_to_size_t = FFI::MemoryPointer.new(:int32) create_partitions_result_array = Rdkafka::Bindings.rd_kafka_CreatePartitions_result_topics(create_partitionss_result, pointer_to_size_t) create_partitions_results = TopicResult.create_topic_results_from_array(pointer_to_size_t.read_int, create_partitions_result_array) create_partitions_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr) if create_partitions_handle = Rdkafka::Admin::CreatePartitionsHandle.remove(create_partitions_handle_ptr.address) create_partitions_handle[:response] = create_partitions_results[0].result_error create_partitions_handle[:error_string] = create_partitions_results[0].error_string create_partitions_handle[:result_name] = create_partitions_results[0].result_name create_partitions_handle.unlock end end |
.process_create_topic(event_ptr) ⇒ Object
Processes create topic result event
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/rdkafka/callbacks.rb', line 239 def self.process_create_topic(event_ptr) create_topics_result = Rdkafka::Bindings.rd_kafka_event_CreateTopics_result(event_ptr) # Get the number of create topic results pointer_to_size_t = FFI::MemoryPointer.new(:int32) create_topic_result_array = Rdkafka::Bindings.rd_kafka_CreateTopics_result_topics(create_topics_result, pointer_to_size_t) create_topic_results = TopicResult.create_topic_results_from_array(pointer_to_size_t.read_int, create_topic_result_array) create_topic_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr) if create_topic_handle = Rdkafka::Admin::CreateTopicHandle.remove(create_topic_handle_ptr.address) create_topic_handle[:response] = create_topic_results[0].result_error create_topic_handle[:error_string] = create_topic_results[0].error_string create_topic_handle[:result_name] = create_topic_results[0].result_name create_topic_handle.unlock end end |
.process_delete_acl(event_ptr) ⇒ Object
Processes delete ACL result event
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 |
# File 'lib/rdkafka/callbacks.rb', line 378 def self.process_delete_acl(event_ptr) delete_acls_result = Rdkafka::Bindings.rd_kafka_event_DeleteAcls_result(event_ptr) # Get the number of acl results pointer_to_size_t = FFI::MemoryPointer.new(:int32) delete_acl_result_responses = Rdkafka::Bindings.rd_kafka_DeleteAcls_result_responses(delete_acls_result, pointer_to_size_t) delete_acl_results = DeleteAclResult.delete_acl_results_from_array(pointer_to_size_t.read_int, delete_acl_result_responses) delete_acl_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr) if delete_acl_handle = Rdkafka::Admin::DeleteAclHandle.remove(delete_acl_handle_ptr.address) delete_acl_handle[:response] = delete_acl_results[0].result_error delete_acl_handle[:response_string] = delete_acl_results[0].error_string if delete_acl_results[0].result_error == Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR delete_acl_handle[:matching_acls] = delete_acl_results[0].matching_acls delete_acl_handle[:matching_acls_count] = delete_acl_results[0].matching_acls_count end delete_acl_handle.unlock end end |
.process_delete_groups(event_ptr) ⇒ Object
Processes delete groups result event
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/rdkafka/callbacks.rb', line 299 def self.process_delete_groups(event_ptr) delete_groups_result = Rdkafka::Bindings.rd_kafka_event_DeleteGroups_result(event_ptr) # Get the number of delete group results pointer_to_size_t = FFI::MemoryPointer.new(:size_t) delete_group_result_array = Rdkafka::Bindings.rd_kafka_DeleteGroups_result_groups(delete_groups_result, pointer_to_size_t) delete_group_results = GroupResult.create_group_results_from_array(pointer_to_size_t.read_int, delete_group_result_array) # TODO fix this delete_group_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr) if (delete_group_handle = Rdkafka::Admin::DeleteGroupsHandle.remove(delete_group_handle_ptr.address)) delete_group_handle[:response] = delete_group_results[0].result_error delete_group_handle[:error_string] = delete_group_results[0].error_string delete_group_handle[:result_name] = delete_group_results[0].result_name delete_group_handle.unlock end end |
.process_delete_topic(event_ptr) ⇒ Object
Processes delete topic result event
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
# File 'lib/rdkafka/callbacks.rb', line 319 def self.process_delete_topic(event_ptr) delete_topics_result = Rdkafka::Bindings.rd_kafka_event_DeleteTopics_result(event_ptr) # Get the number of topic results pointer_to_size_t = FFI::MemoryPointer.new(:int32) delete_topic_result_array = Rdkafka::Bindings.rd_kafka_DeleteTopics_result_topics(delete_topics_result, pointer_to_size_t) delete_topic_results = TopicResult.create_topic_results_from_array(pointer_to_size_t.read_int, delete_topic_result_array) delete_topic_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr) if delete_topic_handle = Rdkafka::Admin::DeleteTopicHandle.remove(delete_topic_handle_ptr.address) delete_topic_handle[:response] = delete_topic_results[0].result_error delete_topic_handle[:error_string] = delete_topic_results[0].error_string delete_topic_handle[:result_name] = delete_topic_results[0].result_name delete_topic_handle.unlock end end |
.process_describe_acl(event_ptr) ⇒ Object
Processes describe ACL result event
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 |
# File 'lib/rdkafka/callbacks.rb', line 402 def self.process_describe_acl(event_ptr) describe_acl = DescribeAclResult.new(event_ptr) describe_acl_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr) if describe_acl_handle = Rdkafka::Admin::DescribeAclHandle.remove(describe_acl_handle_ptr.address) describe_acl_handle[:response] = describe_acl.result_error describe_acl_handle[:response_string] = describe_acl.error_string if describe_acl.result_error == Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR describe_acl_handle[:acls] = describe_acl.matching_acls describe_acl_handle[:acls_count] = describe_acl.matching_acls_count end describe_acl_handle.unlock end end |
.process_describe_configs(event_ptr) ⇒ Object
Processes describe configs result event
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/rdkafka/callbacks.rb', line 259 def self.process_describe_configs(event_ptr) describe_configs = DescribeConfigsResult.new(event_ptr) describe_configs_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr) if describe_configs_handle = Rdkafka::Admin::DescribeConfigsHandle.remove(describe_configs_handle_ptr.address) describe_configs_handle[:response] = describe_configs.result_error describe_configs_handle[:response_string] = describe_configs.error_string describe_configs_handle[:pending] = false if describe_configs.result_error == Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR describe_configs_handle[:config_entries] = describe_configs.results describe_configs_handle[:entry_count] = describe_configs.results_count end describe_configs_handle.unlock end end |
.process_incremental_alter_configs(event_ptr) ⇒ Object
Processes incremental alter configs result event
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 |
# File 'lib/rdkafka/callbacks.rb', line 279 def self.process_incremental_alter_configs(event_ptr) incremental_alter = IncrementalAlterConfigsResult.new(event_ptr) incremental_alter_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr) if incremental_alter_handle = Rdkafka::Admin::IncrementalAlterConfigsHandle.remove(incremental_alter_handle_ptr.address) incremental_alter_handle[:response] = incremental_alter.result_error incremental_alter_handle[:response_string] = incremental_alter.error_string incremental_alter_handle[:pending] = false if incremental_alter.result_error == Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR incremental_alter_handle[:config_entries] = incremental_alter.results incremental_alter_handle[:entry_count] = incremental_alter.results_count end incremental_alter_handle.unlock end end |
.process_list_offsets(event_ptr) ⇒ Object
Processes list offsets result event
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 |
# File 'lib/rdkafka/callbacks.rb', line 421 def self.process_list_offsets(event_ptr) list_offsets = ListOffsetsResult.new(event_ptr) list_offsets_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr) if list_offsets_handle = Rdkafka::Admin::ListOffsetsHandle.remove(list_offsets_handle_ptr.address) list_offsets_handle[:response] = list_offsets.result_error list_offsets_handle[:response_string] = list_offsets.error_string list_offsets_handle[:pending] = false if list_offsets.result_error == Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR list_offsets_handle[:result_infos] = list_offsets.result_infos list_offsets_handle[:result_count] = list_offsets.result_count end list_offsets_handle.unlock end end |