Class: Rdkafka::NativeKafka
- Inherits:
-
Object
- Object
- Rdkafka::NativeKafka
- Defined in:
- lib/rdkafka/native_kafka.rb
Overview
A wrapper around a native kafka that polls and cleanly exits
Instance Method Summary collapse
-
#close(object_id = nil) { ... } ⇒ nil
Closes the native Kafka handle and cleans up resources.
-
#closed? ⇒ Boolean
Returns whether this native Kafka handle is closed or closing.
-
#enable_background_queue_io_events(fd, payload = "\x01") ⇒ nil
Enable IO event notifications on the background queue Librdkafka will write to your FD when the background queue transitions from empty to non-empty.
-
#enable_main_queue_io_events(fd, payload = "\x01") ⇒ nil
Enable IO event notifications on the main queue Librdkafka will write to your FD when the queue transitions from empty to non-empty.
-
#finalizer ⇒ Proc
Returns a finalizer proc for closing this native Kafka handle.
-
#initialize(inner, run_polling_thread:, opaque:, auto_start: true, timeout_ms: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS) ⇒ NativeKafka
constructor
Creates a new NativeKafka wrapper.
-
#start ⇒ nil
Starts the polling thread if configured.
-
#synchronize(&block) {|FFI::Pointer| ... } ⇒ Object
Executes a block while holding exclusive access to the native Kafka handle.
-
#with_inner {|FFI::Pointer| ... } ⇒ Object
Executes a block with the inner native Kafka handle.
Constructor Details
#initialize(inner, run_polling_thread:, opaque:, auto_start: true, timeout_ms: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS) ⇒ NativeKafka
Creates a new NativeKafka wrapper
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/rdkafka/native_kafka.rb', line 13 def initialize(inner, run_polling_thread:, opaque:, auto_start: true, timeout_ms: Defaults::NATIVE_KAFKA_POLL_TIMEOUT_MS) @inner = inner @opaque = opaque # Lock around external access @access_mutex = Mutex.new # Lock around internal polling @poll_mutex = Mutex.new # Lock around decrementing the operations in progress counter # We have two mutexes - one for increment (`@access_mutex`) and one for decrement mutex # because they serve different purposes: # # - `@access_mutex` allows us to lock the execution and make sure that any operation within # the `#synchronize` is the only one running and that there are no other running # operations. # - `@decrement_mutex` ensures, that our decrement operation is thread-safe for any Ruby # implementation. # # We do not use the same mutex, because it could create a deadlock when an already # incremented operation cannot decrement because `@access_lock` is now owned by a different # thread in a synchronized mode and the synchronized mode is waiting on the decrement. @decrement_mutex = Mutex.new # counter for operations in progress using inner @operations_in_progress = 0 @run_polling_thread = run_polling_thread @timeout_ms = timeout_ms start if auto_start @closing = false end |
Instance Method Details
#close(object_id = nil) { ... } ⇒ nil
Closes the native Kafka handle and cleans up resources
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/rdkafka/native_kafka.rb', line 189 def close(object_id = nil) return if closed? synchronize do # Indicate to the outside world that we are closing @closing = true if @polling_thread # Indicate to polling thread that we're closing @polling_thread[:closing] = true # Wait for the polling thread to finish up, # this can be aborted in practice if this # code runs from a finalizer. @polling_thread.join end # Destroy the client after locking both mutexes @poll_mutex.lock # This check prevents a race condition, where we would enter the close in two threads # and after unlocking the primary one that hold the lock but finished, ours would be unlocked # and would continue to run, trying to destroy inner twice return unless @inner yield if block_given? Rdkafka::Bindings.rd_kafka_destroy(@inner) @inner = nil @opaque = nil @poll_mutex.unlock @poll_mutex = nil end end |
#closed? ⇒ Boolean
Returns whether this native Kafka handle is closed or closing
120 121 122 |
# File 'lib/rdkafka/native_kafka.rb', line 120 def closed? @closing || @inner.nil? end |
#enable_background_queue_io_events(fd, payload = "\x01") ⇒ nil
This method is incompatible with background polling threads. If background polling is enabled, use manual polling instead (e.g., consumer.poll)
Enable IO event notifications on the background queue Librdkafka will write to your FD when the background queue transitions from empty to non-empty
171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/rdkafka/native_kafka.rb', line 171 def enable_background_queue_io_events(fd, payload = "\x01") if @run_polling_thread raise "Cannot enable IO events while background polling thread is active. " \ "Either disable background polling by setting run_polling_thread: false, " \ "or use manual polling with consumer.poll() instead of the FD API." end with_inner do |inner| queue_ptr = Bindings.rd_kafka_queue_get_background(inner) Bindings.rd_kafka_queue_io_event_enable(queue_ptr, fd, payload, payload.bytesize) Bindings.rd_kafka_queue_destroy(queue_ptr) end end |
#enable_main_queue_io_events(fd, payload = "\x01") ⇒ nil
This method is incompatible with background polling threads. If background polling is enabled, use manual polling instead (e.g., consumer.poll)
Enable IO event notifications on the main queue Librdkafka will write to your FD when the queue transitions from empty to non-empty
146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/rdkafka/native_kafka.rb', line 146 def enable_main_queue_io_events(fd, payload = "\x01") if @run_polling_thread raise "Cannot enable IO events while background polling thread is active. " \ "Either disable background polling by setting run_polling_thread: false, " \ "or use manual polling with consumer.poll() instead of the FD API." end with_inner do |inner| queue_ptr = Bindings.rd_kafka_queue_get_main(inner) Bindings.rd_kafka_queue_io_event_enable(queue_ptr, fd, payload, payload.bytesize) Bindings.rd_kafka_queue_destroy(queue_ptr) end end |
#finalizer ⇒ Proc
Returns a finalizer proc for closing this native Kafka handle
114 115 116 |
# File 'lib/rdkafka/native_kafka.rb', line 114 def finalizer ->(_) { close } end |
#start ⇒ nil
Starts the polling thread if configured
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/rdkafka/native_kafka.rb', line 48 def start synchronize do return if @started @started = true # Trigger initial poll to make sure oauthbearer cb and other initial cb are handled Rdkafka::Bindings.rd_kafka_poll(@inner, 0) if @run_polling_thread # Start thread to poll client for delivery callbacks, # not used in consumer. @polling_thread = Thread.new do loop do @poll_mutex.synchronize do Rdkafka::Bindings.rd_kafka_poll(@inner, @timeout_ms) end # Exit thread if closing and the poll queue is empty if Thread.current[:closing] && Rdkafka::Bindings.rd_kafka_outq_len(@inner) == 0 break end end end @polling_thread.name = "rdkafka.native_kafka##{Rdkafka::Bindings.rd_kafka_name(@inner).gsub("rdkafka", "")}" @polling_thread.abort_on_exception = true @polling_thread[:closing] = false end end end |
#synchronize(&block) {|FFI::Pointer| ... } ⇒ Object
Executes a block while holding exclusive access to the native Kafka handle
100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/rdkafka/native_kafka.rb', line 100 def synchronize(&block) @access_mutex.synchronize do # Wait for any commands using the inner to finish # This can take a while on blocking operations like polling but is essential not to proceed # with certain types of operations like resources destruction as it can cause the process # to hang or crash sleep(Defaults::NATIVE_KAFKA_SYNCHRONIZE_SLEEP_INTERVAL_MS / 1000.0) until @operations_in_progress.zero? with_inner(&block) end end |
#with_inner {|FFI::Pointer| ... } ⇒ Object
Executes a block with the inner native Kafka handle
84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/rdkafka/native_kafka.rb', line 84 def with_inner if @access_mutex.owned? @operations_in_progress += 1 else @access_mutex.synchronize { @operations_in_progress += 1 } end @inner.nil? ? raise(ClosedInnerError) : yield(@inner) ensure @decrement_mutex.synchronize { @operations_in_progress -= 1 } end |