Module: Rdkafka::Callbacks

Defined in:
lib/rdkafka/callbacks.rb

Overview

Callback handlers for librdkafka events

Defined Under Namespace

Classes: BackgroundEventCallback, CreateAclResult, DeleteAclResult, DeliveryCallback, DescribeAclResult, DescribeConfigsResult, GroupResult, IncrementalAlterConfigsResult, TopicResult

Constant Summary collapse

@@mutex =
Mutex.new
@@current_pid =
nil

Class Method Summary collapse

Class Method Details

.ensure_ffi_runningObject

Defines or recreates after fork callbacks that require FFI thread so the callback thread is always correctly initialized



444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
# File 'lib/rdkafka/callbacks.rb', line 444

def ensure_ffi_running
  @@mutex.synchronize do
    return if @@current_pid == ::Process.pid

    if const_defined?(:BackgroundEventCallbackFunction, false)
      send(:remove_const, :BackgroundEventCallbackFunction)
      send(:remove_const, :DeliveryCallbackFunction)
    end

    # FFI Function used for Create Topic and Delete Topic callbacks
    background_event_callback_function = FFI::Function.new(
        :void, [:pointer, :pointer, :pointer]
    ) do |client_ptr, event_ptr, opaque_ptr|
      BackgroundEventCallback.call(client_ptr, event_ptr, opaque_ptr)
    end

    # FFI Function used for Message Delivery callbacks
    delivery_callback_function = FFI::Function.new(
        :void, [:pointer, :pointer, :pointer]
    ) do |client_ptr, message_ptr, opaque_ptr|
      DeliveryCallback.call(client_ptr, message_ptr, opaque_ptr)
    end

    const_set(:BackgroundEventCallbackFunction, background_event_callback_function)
    const_set(:DeliveryCallbackFunction, delivery_callback_function)

    @@current_pid = ::Process.pid
  end
end