Class: Rdkafka::Callbacks::DeliveryCallback
- Inherits:
-
Object
- Object
- Rdkafka::Callbacks::DeliveryCallback
- Defined in:
- lib/rdkafka/callbacks.rb
Class Method Summary collapse
-
.call(_client_ptr, message_ptr, opaque_ptr) ⇒ Object
Handles message delivery callbacks.
Class Method Details
.call(_client_ptr, message_ptr, opaque_ptr) ⇒ Object
Handles message delivery callbacks
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 |
# File 'lib/rdkafka/callbacks.rb', line 403 def self.call(_client_ptr, , opaque_ptr) = Rdkafka::Bindings::Message.new() delivery_handle_ptr_address = [:_private].address if delivery_handle = Rdkafka::Producer::DeliveryHandle.remove(delivery_handle_ptr_address) topic_name = Rdkafka::Bindings.rd_kafka_topic_name([:rkt]) # Update delivery handle delivery_handle[:response] = [:err] delivery_handle[:partition] = [:partition] delivery_handle[:offset] = [:offset] delivery_handle[:topic_name] = FFI::MemoryPointer.from_string(topic_name) # Call delivery callback on opaque if opaque = Rdkafka::Config.opaques[opaque_ptr.to_i] opaque.call_delivery_callback( Rdkafka::Producer::DeliveryReport.new( [:partition], [:offset], topic_name, [:err], delivery_handle.label ), delivery_handle ) end delivery_handle.unlock end end |