Class: Rdkafka::Callbacks::DeliveryCallback
- Inherits:
-
Object
- Object
- Rdkafka::Callbacks::DeliveryCallback
- Defined in:
- lib/rdkafka/callbacks.rb
Class Method Summary collapse
-
.call(_client_ptr, message_ptr, opaque_ptr) ⇒ Object
Handles message delivery callbacks.
Class Method Details
.call(_client_ptr, message_ptr, opaque_ptr) ⇒ Object
Handles message delivery callbacks
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 |
# File 'lib/rdkafka/callbacks.rb', line 446 def self.call(_client_ptr, , opaque_ptr) = Rdkafka::Bindings::Message.new() delivery_handle_ptr_address = [:_private].address if delivery_handle = Rdkafka::Producer::DeliveryHandle.remove(delivery_handle_ptr_address) topic_name = Rdkafka::Bindings.rd_kafka_topic_name([:rkt]) # Update delivery handle delivery_handle[:response] = [:err] delivery_handle[:partition] = [:partition] delivery_handle[:offset] = [:offset] delivery_handle[:topic_name] = FFI::MemoryPointer.from_string(topic_name) # Call delivery callback on opaque if opaque = Rdkafka::Config.opaques[opaque_ptr.to_i] opaque.call_delivery_callback( Rdkafka::Producer::DeliveryReport.new( [:partition], [:offset], topic_name, [:err], delivery_handle.label ), delivery_handle ) end delivery_handle.unlock end end |