Class: WaterDrop::Producer
- Inherits:
-
Object
- Object
- WaterDrop::Producer
- Extended by:
- Forwardable
- Includes:
- Karafka::Core::Helpers::Time, Karafka::Core::Taggable, Async, Buffer, ClassMonitor, Idempotence, Sync, Tombstone, Transactions
- Defined in:
- lib/waterdrop/producer.rb,
lib/waterdrop/producer/sync.rb,
lib/waterdrop/producer/async.rb,
lib/waterdrop/producer/buffer.rb,
lib/waterdrop/producer/status.rb,
lib/waterdrop/producer/builder.rb,
lib/waterdrop/producer/testing.rb,
lib/waterdrop/producer/variant.rb,
lib/waterdrop/producer/tombstone.rb,
lib/waterdrop/producer/idempotence.rb,
lib/waterdrop/producer/transactions.rb,
lib/waterdrop/producer/class_monitor.rb
Overview
Main WaterDrop messages producer
Defined Under Namespace
Modules: Async, Buffer, ClassMonitor, Idempotence, Sync, Testing, Tombstone, Transactions Classes: Builder, Status, Variant
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Dry-configurable config object.
-
#id ⇒ String
readonly
Uuid of the current producer.
-
#messages ⇒ Array
readonly
Internal messages buffer.
-
#monitor ⇒ Object
readonly
Monitor we want to use.
-
#status ⇒ Status
readonly
Producer status object.
Instance Method Summary collapse
-
#client ⇒ Rdkafka::Producer
Raw rdkafka producer.
-
#close(force: false) ⇒ Object
Flushes the buffers in a sync way and closes the producer.
-
#close! ⇒ Object
Closes the producer with forced close after timeout, purging any outgoing data.
-
#disconnect ⇒ Boolean
Disconnects the producer from Kafka while keeping it configured for potential reconnection.
-
#disconnectable? ⇒ Boolean
Is the producer in a state from which we can disconnect.
-
#fd_polling? ⇒ Boolean
True if FD-based polling mode is enabled.
-
#initialize(&block) ⇒ Producer
constructor
Creates a not-yet-configured instance of the producer.
-
#inspect ⇒ String
Mutex-safe inspect details.
-
#middleware ⇒ WaterDrop::Producer::Middleware
Returns and caches the middleware object that may be used.
-
#partition_count(topic) ⇒ Integer
Fetches and caches the partition count of a topic.
-
#poller ⇒ WaterDrop::Polling::Poller
Returns the poller instance for this producer.
-
#purge ⇒ Object
Purges data from both the buffer queue as well as the librdkafka queue.
-
#queue_size ⇒ Integer
(also: #queue_length)
Returns the number of messages in the librdkafka producer queue.
-
#setup ⇒ Object
Sets up the whole configuration and initializes all that is needed.
-
#with(**args) ⇒ WaterDrop::Producer::Variant
(also: #variant)
Builds the variant alteration and returns it.
Methods included from Idempotence
#idempotent?, #idempotent_reloadable?, #idempotent_retryable?
Methods included from Transactions
#transaction, #transaction?, #transaction_mark_as_consumed, #transactional?, #transactional_retryable?
Methods included from Tombstone
#tombstone_async, #tombstone_many_async, #tombstone_many_sync, #tombstone_sync
Methods included from Buffer
#buffer, #buffer_many, #flush_async, #flush_sync
Methods included from Async
#produce_async, #produce_many_async
Methods included from Sync
#produce_many_sync, #produce_sync
Constructor Details
#initialize(&block) ⇒ Producer
Creates a not-yet-configured instance of the producer
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/waterdrop/producer.rb', line 52 def initialize(&block) @operations_in_progress = Helpers::Counter.new @buffer_mutex = Mutex.new @connecting_mutex = Mutex.new @operating_mutex = Mutex.new @transaction_mutex = Mutex.new @id = nil @monitor = nil @contract = nil @default_variant = nil @client = nil @closing_thread_id = nil @idempotent = nil @transactional = nil @fd_polling = nil @poller = nil @idempotent_fatal_error_attempts = 0 @transaction_fatal_error_attempts = 0 @status = Status.new @messages = [] # Instrument producer creation for global listeners class_monitor.instrument( "producer.created", producer: self, producer_id: @id ) return unless block setup(&block) end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns dry-configurable config object.
47 48 49 |
# File 'lib/waterdrop/producer.rb', line 47 def config @config end |
#id ⇒ String (readonly)
Returns uuid of the current producer.
39 40 41 |
# File 'lib/waterdrop/producer.rb', line 39 def id @id end |
#messages ⇒ Array (readonly)
Returns internal messages buffer.
43 44 45 |
# File 'lib/waterdrop/producer.rb', line 43 def @messages end |
#monitor ⇒ Object (readonly)
Returns monitor we want to use.
45 46 47 |
# File 'lib/waterdrop/producer.rb', line 45 def monitor @monitor end |
#status ⇒ Status (readonly)
Returns producer status object.
41 42 43 |
# File 'lib/waterdrop/producer.rb', line 41 def status @status end |
Instance Method Details
#client ⇒ Rdkafka::Producer
Client is lazy initialized, keeping in mind also the fact of a potential fork that can happen any time.
It is not recommended to fork a producer that is already in use so in case of bootstrapping a cluster, it’s much better to fork configured but not used producers
Returns raw rdkafka producer.
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/waterdrop/producer.rb', line 139 def client return @client if @client && @pid == Process.pid # Don't allow to obtain a client reference for a producer that was not configured raise Errors::ProducerNotConfiguredError, id if @status.initial? raise Errors::ProducerClosedError, id if @status.closed? @connecting_mutex.synchronize do return @client if @client && @pid == Process.pid # We undefine all the finalizers, in case it was a fork, so the finalizers from the parent # process don't leak ObjectSpace.undefine_finalizer(id) # We should raise an error when trying to use a producer with client from a fork. Always. if @client # We need to reset the client, otherwise there might be attempt to close the parent # client @client = nil raise Errors::ProducerUsedInParentProcess, Process.pid end # Finalizer tracking is needed for handling shutdowns gracefully. # I don't expect everyone to remember about closing all the producers all the time, thus # this approach is better. Although it is still worth keeping in mind, that this will # block GC from removing a no longer used producer unless closed properly but at least # won't crash the VM upon closing the process ObjectSpace.define_finalizer(id, proc { close }) @pid = Process.pid @client = Builder.new.call(self, @config) @status.connected! @monitor.instrument("producer.connected", producer_id: id) end @client end |
#close(force: false) ⇒ Object
Flushes the buffers in a sync way and closes the producer
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 |
# File 'lib/waterdrop/producer.rb', line 341 def close(force: false) # When closing from within the FD poller thread (e.g., from a callback like # message.acknowledged or error.occurred), we must delegate to a background thread. # Close performs flush which waits for delivery reports, but delivery reports require # the poller to poll. Since we're ON the poller thread inside a callback, this would # deadlock. Spawning a thread allows the callback to return, letting the poller continue. if fd_polling? && poller.in_poller_thread? Thread.new { close(force: force) } return end # If we already own the transactional mutex, it means we are inside of a transaction and # it should not we allowed to close the producer in such a case. if @transaction_mutex.locked? && @transaction_mutex.owned? raise Errors::ProducerTransactionalCloseAttemptError, id end # The transactional mutex here can be used even when no transactions are in use # It prevents us from closing a mutex during transactions and is irrelevant in other cases @transaction_mutex.synchronize do @operating_mutex.synchronize do return unless @status.active? @monitor.instrument( "producer.closed", producer_id: id ) do @status.closing! @monitor.instrument("producer.closing", producer_id: id) # No need for auto-gc if everything got closed by us # This should be used only in case a producer was not closed properly and forgotten ObjectSpace.undefine_finalizer(id) # We save this thread id because we need to bypass the activity verification on the # producer for final flush of buffers. @closing_thread_id = Thread.current.object_id # Wait until all the outgoing operations are done. Only when no one is using the # underlying client running operations we can close sleep(0.001) until @operations_in_progress.value.zero? # Flush has its own buffer mutex but even if it is blocked, flushing can still happen # as we close the client after the flushing (even if blocked by the mutex) flush(true) # We should not close the client in several threads the same time # It is safe to run it several times but not exactly the same moment # We also mark it as closed only if it was connected, if not, it would trigger a new # connection that anyhow would be immediately closed if @client # Why do we trigger it early instead of just having `#close` do it? # The linger.ms time will be ignored for the duration of the call, # queued messages will be sent to the broker as soon as possible. begin @client.flush(current_variant.max_wait_timeout) unless @client.closed? # We can safely ignore timeouts here because any left outstanding requests # will anyhow force wait on close if not forced. # If forced, we will purge the queue and just close rescue ::Rdkafka::RdkafkaError, Rdkafka::AbstractHandle::WaitTimeoutError nil ensure # Purge fully the local queue in case of a forceful shutdown just to be sure, that # there are no dangling messages. In case flush was successful, there should be # none but we do it just in case it timed out purge if force end # Unregister from poller before closing if fiber polling is enabled unregister_from_poller @client.close @client = nil end # Remove callbacks runners that were registered ::Karafka::Core::Instrumentation.statistics_callbacks.delete(@id) ::Karafka::Core::Instrumentation.error_callbacks.delete(@id) ::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.delete(@id) @status.closed! end end end end |
#close! ⇒ Object
Closes the producer with forced close after timeout, purging any outgoing data
429 430 431 |
# File 'lib/waterdrop/producer.rb', line 429 def close! close(force: true) end |
#disconnect ⇒ Boolean
This method will refuse to disconnect if:
-
There are pending messages in the internal buffer
-
There are operations currently in progress
-
A transaction is currently active
-
The client is not currently connected
-
Required mutexes are locked by other operations
After successful disconnection, the producer status changes to disconnected but remains configured, allowing for future reconnection when client access is needed.
Disconnects the producer from Kafka while keeping it configured for potential reconnection
This method safely disconnects the underlying Kafka client while preserving the producer’s configuration. Unlike ‘#close`, this allows the producer to be reconnected later by calling methods that require the client. The disconnection will only proceed if certain safety conditions are met.
This API can be used to preserve connections on low-intensity producer instances, etc.
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 |
# File 'lib/waterdrop/producer.rb', line 289 def disconnect return false unless disconnectable? # Use the same mutex pattern as the regular close method to prevent race conditions @transaction_mutex.synchronize do @operating_mutex.synchronize do @buffer_mutex.synchronize do return false unless @client return false unless @status.connected? return false unless @messages.empty? return false unless @operations_in_progress.value.zero? @status.disconnecting! @monitor.instrument("producer.disconnecting", producer_id: id) @monitor.instrument("producer.disconnected", producer_id: id) do # Unregister from poller before closing if fiber polling is enabled unregister_from_poller # Close the client @client.close @client = nil # Reset connection status but keep producer configured @status.disconnected! end true end end end end |
#disconnectable? ⇒ Boolean
This is a best effort method. The proper checks happen also when disconnecting behind all the needed mutexes
Is the producer in a state from which we can disconnect
328 329 330 331 332 333 334 335 336 |
# File 'lib/waterdrop/producer.rb', line 328 def disconnectable? return false unless @client return false unless @status.connected? return false unless @messages.empty? return false if @transaction_mutex.locked? return false if @operating_mutex.locked? true end |
#fd_polling? ⇒ Boolean
Returns true if FD-based polling mode is enabled.
465 466 467 468 469 470 |
# File 'lib/waterdrop/producer.rb', line 465 def fd_polling? return @fd_polling unless @fd_polling.nil? return false unless config @fd_polling = config.polling.mode == :fd end |
#inspect ⇒ String
Returns mutex-safe inspect details.
434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 |
# File 'lib/waterdrop/producer.rb', line 434 def inspect # Basic info that's always safe to access parts = [] parts << "id=#{@id.inspect}" parts << "status=#{@status}" if @status # Try to get buffer info safely if @buffer_mutex.try_lock begin parts << "buffer_size=#{@messages.size}" ensure @buffer_mutex.unlock end else parts << "buffer_size=busy" end # Check if client is connected without triggering connection parts << if @status.connected? "connected=true" else "connected=false" end parts << "operations=#{@operations_in_progress.value}" parts << "in_transaction=true" if @transaction_mutex.locked? "#<#{self.class.name}:#{format("%#x", object_id)} #{parts.join(" ")}>" end |
#middleware ⇒ WaterDrop::Producer::Middleware
Returns and caches the middleware object that may be used
263 264 265 |
# File 'lib/waterdrop/producer.rb', line 263 def middleware @middleware ||= config.middleware end |
#partition_count(topic) ⇒ Integer
It uses the underlying ‘rdkafka-ruby` partition count fetch and cache.
Fetches and caches the partition count of a topic
220 221 222 |
# File 'lib/waterdrop/producer.rb', line 220 def partition_count(topic) client.partition_count(topic.to_s) end |
#poller ⇒ WaterDrop::Polling::Poller
Returns the poller instance for this producer
475 476 477 478 479 480 |
# File 'lib/waterdrop/producer.rb', line 475 def poller return @poller unless @poller.nil? return nil unless config @poller = config.polling.poller || Polling::Poller.instance end |
#purge ⇒ Object
This is an operation that can cause data loss. Keep that in mind. It will not only purge the internal WaterDrop buffer but will also purge the librdkafka queue as well as will cancel any outgoing messages dispatches.
Purges data from both the buffer queue as well as the librdkafka queue.
229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/waterdrop/producer.rb', line 229 def purge @monitor.instrument("buffer.purged", producer_id: id) do @buffer_mutex.synchronize do @messages = [] end # We should not purge if there is no client initialized # It may not be initialized if we created a new producer that never connected to kafka, # we used buffer and purged. In cases like this client won't exist @connecting_mutex.synchronize do @client&.purge end end end |
#queue_size ⇒ Integer Also known as: queue_length
This only counts messages in the rdkafka queue, not the internal WaterDrop buffer. To get the internal buffer count, use ‘messages.size`.
Returns 0 when the producer is not connected as there cannot be any pending messages if we haven’t connected.
Returns the number of messages in the librdkafka producer queue.
This count includes:
-
Messages waiting to be sent to the broker
-
Messages currently in-flight (being transmitted)
-
Delivery reports waiting to be processed
201 202 203 204 205 206 207 208 209 |
# File 'lib/waterdrop/producer.rb', line 201 def queue_size return 0 unless @status.connected? @connecting_mutex.synchronize do return 0 unless @client @client.queue_size end end |
#setup ⇒ Object
Sets up the whole configuration and initializes all that is needed
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/waterdrop/producer.rb', line 87 def setup(...) raise Errors::ProducerAlreadyConfiguredError, id unless @status.initial? @config = Config .new .setup(...) .config @id = @config.id @monitor = @config.monitor @contract = Contracts::Message.new(max_payload_size: @config.max_payload_size) @default_variant = Variant.new(self, default: true) if @config.idle_disconnect_timeout.zero? @status.configured! # Instrument producer configuration for global listeners class_monitor.instrument( "producer.configured", producer: self, producer_id: @id, config: @config ) return end # Setup idle disconnect listener if configured so we preserve tcp connections on rarely # used producers disconnector = Instrumentation::IdleDisconnectorListener.new( self, disconnect_timeout: @config.idle_disconnect_timeout ) @monitor.subscribe(disconnector) @status.configured! # Instrument producer configuration for global listeners class_monitor.instrument( "producer.configured", producer: self, producer_id: @id, config: @config ) end |
#with(**args) ⇒ WaterDrop::Producer::Variant Also known as: variant
Builds the variant alteration and returns it.
253 254 255 256 257 |
# File 'lib/waterdrop/producer.rb', line 253 def with(**args) ensure_active! Variant.new(self, **args) end |