Module: Rdkafka::Callbacks

Defined in:
lib/rdkafka/callbacks.rb

Overview

Callback handlers for librdkafka events

Defined Under Namespace

Classes: BackgroundEventCallback, CreateAclResult, DeleteAclResult, DeliveryCallback, DescribeAclResult, DescribeConfigsResult, GroupResult, IncrementalAlterConfigsResult, ListOffsetsResult, TopicResult

Constant Summary collapse

@@mutex =
Mutex.new
@@current_pid =
nil

Class Method Summary collapse

Class Method Details

.ensure_ffi_runningObject

Defines or recreates after fork callbacks that require FFI thread so the callback thread is always correctly initialized



487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
# File 'lib/rdkafka/callbacks.rb', line 487

def ensure_ffi_running
  @@mutex.synchronize do
    return if @@current_pid == ::Process.pid

    if const_defined?(:BackgroundEventCallbackFunction, false)
      send(:remove_const, :BackgroundEventCallbackFunction)
      send(:remove_const, :DeliveryCallbackFunction)
    end

    # FFI Function used for Create Topic and Delete Topic callbacks
    background_event_callback_function = FFI::Function.new(
      :void, [:pointer, :pointer, :pointer]
    ) do |client_ptr, event_ptr, opaque_ptr|
      BackgroundEventCallback.call(client_ptr, event_ptr, opaque_ptr)
    end

    # FFI Function used for Message Delivery callbacks
    delivery_callback_function = FFI::Function.new(
      :void, [:pointer, :pointer, :pointer]
    ) do |client_ptr, message_ptr, opaque_ptr|
      DeliveryCallback.call(client_ptr, message_ptr, opaque_ptr)
    end

    const_set(:BackgroundEventCallbackFunction, background_event_callback_function)
    const_set(:DeliveryCallbackFunction, delivery_callback_function)

    @@current_pid = ::Process.pid
  end
end