Class: Rdkafka::Callbacks::DeliveryCallback

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/callbacks.rb

Class Method Summary collapse

Class Method Details

.call(_client_ptr, message_ptr, opaque_ptr) ⇒ Object

Handles message delivery callbacks

Parameters:

  • _client_ptr (FFI::Pointer)

    unused client pointer

  • message_ptr (FFI::Pointer)

    pointer to the delivered message

  • opaque_ptr (FFI::Pointer)

    pointer to the opaque object for callback context



446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# File 'lib/rdkafka/callbacks.rb', line 446

def self.call(_client_ptr, message_ptr, opaque_ptr)
  message = Rdkafka::Bindings::Message.new(message_ptr)
  delivery_handle_ptr_address = message[:_private].address
  if delivery_handle = Rdkafka::Producer::DeliveryHandle.remove(delivery_handle_ptr_address)
    topic_name = Rdkafka::Bindings.rd_kafka_topic_name(message[:rkt])

    # Update delivery handle
    delivery_handle[:response] = message[:err]
    delivery_handle[:partition] = message[:partition]
    delivery_handle[:offset] = message[:offset]
    delivery_handle[:topic_name] = FFI::MemoryPointer.from_string(topic_name)

    # Call delivery callback on opaque
    if opaque = Rdkafka::Config.opaques[opaque_ptr.to_i]
      opaque.call_delivery_callback(
        Rdkafka::Producer::DeliveryReport.new(
          message[:partition],
          message[:offset],
          topic_name,
          message[:err],
          delivery_handle.label
        ),
        delivery_handle
      )
    end

    delivery_handle.unlock
  end
end