Class: Rdkafka::NativeKafka

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/native_kafka.rb

Overview

A wrapper around a native kafka that polls and cleanly exits

Instance Method Summary collapse

Constructor Details

#initialize(inner, run_polling_thread:, opaque:, auto_start: true, timeout_ms: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS) ⇒ NativeKafka

Creates a new NativeKafka wrapper

Parameters:

  • inner (FFI::Pointer)

    pointer to the native Kafka handle

  • run_polling_thread (Boolean)

    whether to run a background polling thread

  • opaque (Rdkafka::Opaque)

    opaque object for callback context

  • auto_start (Boolean) (defaults to: true)

    whether to start the polling thread automatically

  • timeout_ms (Integer) (defaults to: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS)

    poll timeout in milliseconds



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/rdkafka/native_kafka.rb', line 13

def initialize(inner, run_polling_thread:, opaque:, auto_start: true, timeout_ms: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS)
  @inner = inner
  @opaque = opaque
  # Lock around external access
  @access_mutex = Mutex.new
  # Lock around internal polling
  @poll_mutex = Mutex.new
  # Lock around decrementing the operations in progress counter
  # We have two mutexes - one for increment (`@access_mutex`) and one for decrement mutex
  # because they serve different purposes:
  #
  #   - `@access_mutex` allows us to lock the execution and make sure that any operation within
  #      the `#synchronize` is the only one running and that there are no other running
  #      operations.
  #   - `@decrement_mutex` ensures, that our decrement operation is thread-safe for any Ruby
  #     implementation.
  #
  # We do not use the same mutex, because it could create a deadlock when an already
  # incremented operation cannot decrement because `@access_lock` is now owned by a different
  # thread in a synchronized mode and the synchronized mode is waiting on the decrement.
  @decrement_mutex = Mutex.new
  # counter for operations in progress using inner
  @operations_in_progress = 0

  @run_polling_thread = run_polling_thread

  @timeout_ms = timeout_ms

  start if auto_start

  @closing = false
end

Instance Method Details

#close(object_id = nil) { ... } ⇒ nil

Closes the native Kafka handle and cleans up resources

Parameters:

  • object_id (Integer, nil) (defaults to: nil)

    optional object ID (unused, for finalizer compatibility)

Yields:

  • optional block to execute before destroying the handle

Returns:

  • (nil)


189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/rdkafka/native_kafka.rb', line 189

def close(object_id = nil)
  return if closed?

  synchronize do
    # Indicate to the outside world that we are closing
    @closing = true

    if @polling_thread
      # Indicate to polling thread that we're closing
      @polling_thread[:closing] = true

      # Wait for the polling thread to finish up,
      # this can be aborted in practice if this
      # code runs from a finalizer.
      @polling_thread.join
    end

    # Destroy the client after locking both mutexes
    @poll_mutex.lock

    # This check prevents a race condition, where we would enter the close in two threads
    # and after unlocking the primary one that hold the lock but finished, ours would be unlocked
    # and would continue to run, trying to destroy inner twice
    return unless @inner

    yield if block_given?

    Rdkafka::Bindings.rd_kafka_destroy(@inner)
    @inner = nil
    @opaque = nil
    @poll_mutex.unlock
    @poll_mutex = nil
  end
end

#closed?Boolean

Returns whether this native Kafka handle is closed or closing

Returns:

  • (Boolean)

    true if closed or closing



120
121
122
# File 'lib/rdkafka/native_kafka.rb', line 120

def closed?
  @closing || @inner.nil?
end

#enable_background_queue_io_events(fd, payload = "\x01") ⇒ nil

Note:

This method is incompatible with background polling threads. If background polling is enabled, use manual polling instead (e.g., consumer.poll)

Enable IO event notifications on the background queue Librdkafka will write to your FD when the background queue transitions from empty to non-empty

Parameters:

  • fd (Integer)

    your file descriptor (from IO.pipe or eventfd)

  • payload (String) (defaults to: "\x01")

    data to write to fd when queue has data (default: “x01”)

Returns:

  • (nil)

Raises:

  • (ClosedInnerError)

    when the handle is closed

  • (RuntimeError)

    when background polling thread is active



171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/rdkafka/native_kafka.rb', line 171

def enable_background_queue_io_events(fd, payload = "\x01")
  if @run_polling_thread
    raise "Cannot enable IO events while background polling thread is active. " \
      "Either disable background polling by setting run_polling_thread: false, " \
      "or use manual polling with consumer.poll() instead of the FD API."
  end

  with_inner do |inner|
    queue_ptr = Bindings.rd_kafka_queue_get_background(inner)
    Bindings.rd_kafka_queue_io_event_enable(queue_ptr, fd, payload, payload.bytesize)
    Bindings.rd_kafka_queue_destroy(queue_ptr)
  end
end

#enable_main_queue_io_events(fd, payload = "\x01") ⇒ nil

Note:

This method is incompatible with background polling threads. If background polling is enabled, use manual polling instead (e.g., consumer.poll)

Enable IO event notifications on the main queue Librdkafka will write to your FD when the queue transitions from empty to non-empty

Examples:

# Create your own signaling FD
signal_r, signal_w = IO.pipe
native_kafka.enable_main_queue_io_events(signal_w.fileno)

# Monitor it with select
readable, = IO.select([signal_r], nil, nil, timeout)
if readable
  consumer.poll(0)  # Get messages
end

Parameters:

  • fd (Integer)

    your file descriptor (from IO.pipe or eventfd)

  • payload (String) (defaults to: "\x01")

    data to write to fd when queue has data (default: “x01”)

Returns:

  • (nil)

Raises:

  • (ClosedInnerError)

    when the handle is closed

  • (RuntimeError)

    when background polling thread is active



146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/rdkafka/native_kafka.rb', line 146

def enable_main_queue_io_events(fd, payload = "\x01")
  if @run_polling_thread
    raise "Cannot enable IO events while background polling thread is active. " \
      "Either disable background polling by setting run_polling_thread: false, " \
      "or use manual polling with consumer.poll() instead of the FD API."
  end

  with_inner do |inner|
    queue_ptr = Bindings.rd_kafka_queue_get_main(inner)
    Bindings.rd_kafka_queue_io_event_enable(queue_ptr, fd, payload, payload.bytesize)
    Bindings.rd_kafka_queue_destroy(queue_ptr)
  end
end

#finalizerProc

Returns a finalizer proc for closing this native Kafka handle

Returns:

  • (Proc)

    finalizer proc



114
115
116
# File 'lib/rdkafka/native_kafka.rb', line 114

def finalizer
  ->(_) { close }
end

#startnil

Starts the polling thread if configured

Returns:

  • (nil)


48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/rdkafka/native_kafka.rb', line 48

def start
  synchronize do
    return if @started

    @started = true

    # Trigger initial poll to make sure oauthbearer cb and other initial cb are handled
    Rdkafka::Bindings.rd_kafka_poll(@inner, 0)

    if @run_polling_thread
      # Start thread to poll client for delivery callbacks,
      # not used in consumer.
      @polling_thread = Thread.new do
        loop do
          @poll_mutex.synchronize do
            Rdkafka::Bindings.rd_kafka_poll(@inner, @timeout_ms)
          end

          # Exit thread if closing and the poll queue is empty
          if Thread.current[:closing] && Rdkafka::Bindings.rd_kafka_outq_len(@inner) == 0
            break
          end
        end
      end

      @polling_thread.name = "rdkafka.native_kafka##{Rdkafka::Bindings.rd_kafka_name(@inner).gsub("rdkafka", "")}"
      @polling_thread.abort_on_exception = true
      @polling_thread[:closing] = false
    end
  end
end

#synchronize(&block) {|FFI::Pointer| ... } ⇒ Object

Executes a block while holding exclusive access to the native Kafka handle

Parameters:

  • block (Proc)

    block to execute with the native handle

Yields:

  • (FFI::Pointer)

    the inner native Kafka handle

Returns:

  • (Object)

    the result of the block



100
101
102
103
104
105
106
107
108
109
110
# File 'lib/rdkafka/native_kafka.rb', line 100

def synchronize(&block)
  @access_mutex.synchronize do
    # Wait for any commands using the inner to finish
    # This can take a while on blocking operations like polling but is essential not to proceed
    # with certain types of operations like resources destruction as it can cause the process
    # to hang or crash
    sleep(Defaults::NATIVE_KAFKA_SYNCHRONIZE_SLEEP_INTERVAL_MS / 1000.0) until @operations_in_progress.zero?

    with_inner(&block)
  end
end

#with_inner {|FFI::Pointer| ... } ⇒ Object

Executes a block with the inner native Kafka handle

Yields:

  • (FFI::Pointer)

    the inner native Kafka handle

Returns:

  • (Object)

    the result of the block

Raises:



84
85
86
87
88
89
90
91
92
93
94
# File 'lib/rdkafka/native_kafka.rb', line 84

def with_inner
  if @access_mutex.owned?
    @operations_in_progress += 1
  else
    @access_mutex.synchronize { @operations_in_progress += 1 }
  end

  @inner.nil? ? raise(ClosedInnerError) : yield(@inner)
ensure
  @decrement_mutex.synchronize { @operations_in_progress -= 1 }
end