Class: WaterDrop::Producer

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Karafka::Core::Helpers::Time, Karafka::Core::Taggable, Async, Buffer, ClassMonitor, Idempotence, Sync, Tombstone, Transactions
Defined in:
lib/waterdrop/producer.rb,
lib/waterdrop/producer/sync.rb,
lib/waterdrop/producer/async.rb,
lib/waterdrop/producer/buffer.rb,
lib/waterdrop/producer/status.rb,
lib/waterdrop/producer/builder.rb,
lib/waterdrop/producer/testing.rb,
lib/waterdrop/producer/variant.rb,
lib/waterdrop/producer/tombstone.rb,
lib/waterdrop/producer/idempotence.rb,
lib/waterdrop/producer/transactions.rb,
lib/waterdrop/producer/class_monitor.rb

Overview

Main WaterDrop messages producer

Defined Under Namespace

Modules: Async, Buffer, ClassMonitor, Idempotence, Sync, Testing, Tombstone, Transactions Classes: Builder, Status, Variant

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Idempotence

#idempotent?, #idempotent_reloadable?, #idempotent_retryable?

Methods included from Transactions

#transaction, #transaction?, #transaction_mark_as_consumed, #transactional?, #transactional_retryable?

Methods included from Tombstone

#tombstone_async, #tombstone_many_async, #tombstone_many_sync, #tombstone_sync

Methods included from Buffer

#buffer, #buffer_many, #flush_async, #flush_sync

Methods included from Async

#produce_async, #produce_many_async

Methods included from Sync

#produce_many_sync, #produce_sync

Constructor Details

#initialize(&block) ⇒ Producer

Creates a not-yet-configured instance of the producer

Parameters:

  • block (Proc)

    configuration block



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/waterdrop/producer.rb', line 52

def initialize(&block)
  @operations_in_progress = Helpers::Counter.new
  @buffer_mutex = Mutex.new
  @connecting_mutex = Mutex.new
  @operating_mutex = Mutex.new
  @transaction_mutex = Mutex.new
  @id = nil
  @monitor = nil
  @contract = nil
  @default_variant = nil
  @client = nil
  @closing_thread_id = nil
  @idempotent = nil
  @transactional = nil
  @fd_polling = nil
  @poller = nil
  @idempotent_fatal_error_attempts = 0
  @transaction_fatal_error_attempts = 0

  @status = Status.new
  @messages = []

  # Instrument producer creation for global listeners
  class_monitor.instrument(
    "producer.created",
    producer: self,
    producer_id: @id
  )

  return unless block

  setup(&block)
end

Instance Attribute Details

#configObject (readonly)

Returns dry-configurable config object.

Returns:

  • (Object)

    dry-configurable config object



47
48
49
# File 'lib/waterdrop/producer.rb', line 47

def config
  @config
end

#idString (readonly)

Returns uuid of the current producer.

Returns:

  • (String)

    uuid of the current producer



39
40
41
# File 'lib/waterdrop/producer.rb', line 39

def id
  @id
end

#messagesArray (readonly)

Returns internal messages buffer.

Returns:

  • (Array)

    internal messages buffer



43
44
45
# File 'lib/waterdrop/producer.rb', line 43

def messages
  @messages
end

#monitorObject (readonly)

Returns monitor we want to use.

Returns:

  • (Object)

    monitor we want to use



45
46
47
# File 'lib/waterdrop/producer.rb', line 45

def monitor
  @monitor
end

#statusStatus (readonly)

Returns producer status object.

Returns:

  • (Status)

    producer status object



41
42
43
# File 'lib/waterdrop/producer.rb', line 41

def status
  @status
end

Instance Method Details

#clientRdkafka::Producer

Note:

Client is lazy initialized, keeping in mind also the fact of a potential fork that can happen any time.

Note:

It is not recommended to fork a producer that is already in use so in case of bootstrapping a cluster, it’s much better to fork configured but not used producers

Returns raw rdkafka producer.

Returns:

  • (Rdkafka::Producer)

    raw rdkafka producer

Raises:



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/waterdrop/producer.rb', line 139

def client
  return @client if @client && @pid == Process.pid

  # Don't allow to obtain a client reference for a producer that was not configured
  raise Errors::ProducerNotConfiguredError, id if @status.initial?
  raise Errors::ProducerClosedError, id if @status.closed?

  @connecting_mutex.synchronize do
    return @client if @client && @pid == Process.pid

    # We undefine all the finalizers, in case it was a fork, so the finalizers from the parent
    # process don't leak
    ObjectSpace.undefine_finalizer(id)

    # We should raise an error when trying to use a producer with client from a fork. Always.
    if @client
      # We need to reset the client, otherwise there might be attempt to close the parent
      # client
      @client = nil
      raise Errors::ProducerUsedInParentProcess, Process.pid
    end

    # Finalizer tracking is needed for handling shutdowns gracefully.
    # I don't expect everyone to remember about closing all the producers all the time, thus
    # this approach is better. Although it is still worth keeping in mind, that this will
    # block GC from removing a no longer used producer unless closed properly but at least
    # won't crash the VM upon closing the process
    ObjectSpace.define_finalizer(id, proc { close })

    @pid = Process.pid
    @client = Builder.new.call(self, @config)

    @status.connected!
    @monitor.instrument("producer.connected", producer_id: id)
  end

  @client
end

#close(force: false) ⇒ Object

Flushes the buffers in a sync way and closes the producer

Parameters:

  • force (Boolean) (defaults to: false)

    should we force closing even with outstanding messages after the max wait timeout



341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
# File 'lib/waterdrop/producer.rb', line 341

def close(force: false)
  # When closing from within the FD poller thread (e.g., from a callback like
  # message.acknowledged or error.occurred), we must delegate to a background thread.
  # Close performs flush which waits for delivery reports, but delivery reports require
  # the poller to poll. Since we're ON the poller thread inside a callback, this would
  # deadlock. Spawning a thread allows the callback to return, letting the poller continue.
  if fd_polling? && poller.in_poller_thread?
    Thread.new { close(force: force) }
    return
  end

  # If we already own the transactional mutex, it means we are inside of a transaction and
  # it should not we allowed to close the producer in such a case.
  if @transaction_mutex.locked? && @transaction_mutex.owned?
    raise Errors::ProducerTransactionalCloseAttemptError, id
  end

  # The transactional mutex here can be used even when no transactions are in use
  # It prevents us from closing a mutex during transactions and is irrelevant in other cases
  @transaction_mutex.synchronize do
    @operating_mutex.synchronize do
      return unless @status.active?

      @monitor.instrument(
        "producer.closed",
        producer_id: id
      ) do
        @status.closing!
        @monitor.instrument("producer.closing", producer_id: id)

        # No need for auto-gc if everything got closed by us
        # This should be used only in case a producer was not closed properly and forgotten
        ObjectSpace.undefine_finalizer(id)

        # We save this thread id because we need to bypass the activity verification on the
        # producer for final flush of buffers.
        @closing_thread_id = Thread.current.object_id

        # Wait until all the outgoing operations are done. Only when no one is using the
        # underlying client running operations we can close
        sleep(0.001) until @operations_in_progress.value.zero?

        # Flush has its own buffer mutex but even if it is blocked, flushing can still happen
        # as we close the client after the flushing (even if blocked by the mutex)
        flush(true)

        # We should not close the client in several threads the same time
        # It is safe to run it several times but not exactly the same moment
        # We also mark it as closed only if it was connected, if not, it would trigger a new
        # connection that anyhow would be immediately closed
        if @client
          # Why do we trigger it early instead of just having `#close` do it?
          # The linger.ms time will be ignored for the duration of the call,
          # queued messages will be sent to the broker as soon as possible.
          begin
            @client.flush(current_variant.max_wait_timeout) unless @client.closed?
          # We can safely ignore timeouts here because any left outstanding requests
          # will anyhow force wait on close if not forced.
          # If forced, we will purge the queue and just close
          rescue ::Rdkafka::RdkafkaError, Rdkafka::AbstractHandle::WaitTimeoutError
            nil
          ensure
            # Purge fully the local queue in case of a forceful shutdown just to be sure, that
            # there are no dangling messages. In case flush was successful, there should be
            # none but we do it just in case it timed out
            purge if force
          end

          # Unregister from poller before closing if fiber polling is enabled
          unregister_from_poller

          @client.close

          @client = nil
        end

        # Remove callbacks runners that were registered
        ::Karafka::Core::Instrumentation.statistics_callbacks.delete(@id)
        ::Karafka::Core::Instrumentation.error_callbacks.delete(@id)
        ::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.delete(@id)

        @status.closed!
      end
    end
  end
end

#close!Object

Closes the producer with forced close after timeout, purging any outgoing data



429
430
431
# File 'lib/waterdrop/producer.rb', line 429

def close!
  close(force: true)
end

#disconnectBoolean

Note:

This method will refuse to disconnect if:

  • There are pending messages in the internal buffer

  • There are operations currently in progress

  • A transaction is currently active

  • The client is not currently connected

  • Required mutexes are locked by other operations

Note:

After successful disconnection, the producer status changes to disconnected but remains configured, allowing for future reconnection when client access is needed.

Disconnects the producer from Kafka while keeping it configured for potential reconnection

This method safely disconnects the underlying Kafka client while preserving the producer’s configuration. Unlike ‘#close`, this allows the producer to be reconnected later by calling methods that require the client. The disconnection will only proceed if certain safety conditions are met.

This API can be used to preserve connections on low-intensity producer instances, etc.

Returns:

  • (Boolean)

    true if disconnection was successful, false if disconnection was not possible due to safety conditions (active transactions, ongoing operations, pending messages in buffer, or if already disconnected)



289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/waterdrop/producer.rb', line 289

def disconnect
  return false unless disconnectable?

  # Use the same mutex pattern as the regular close method to prevent race conditions
  @transaction_mutex.synchronize do
    @operating_mutex.synchronize do
      @buffer_mutex.synchronize do
        return false unless @client
        return false unless @status.connected?
        return false unless @messages.empty?
        return false unless @operations_in_progress.value.zero?

        @status.disconnecting!
        @monitor.instrument("producer.disconnecting", producer_id: id)

        @monitor.instrument("producer.disconnected", producer_id: id) do
          # Unregister from poller before closing if fiber polling is enabled
          unregister_from_poller

          # Close the client
          @client.close
          @client = nil

          # Reset connection status but keep producer configured
          @status.disconnected!
        end

        true
      end
    end
  end
end

#disconnectable?Boolean

Note:

This is a best effort method. The proper checks happen also when disconnecting behind all the needed mutexes

Is the producer in a state from which we can disconnect

Returns:

  • (Boolean)

    is producer in a state that potentially allows for a disconnect



328
329
330
331
332
333
334
335
336
# File 'lib/waterdrop/producer.rb', line 328

def disconnectable?
  return false unless @client
  return false unless @status.connected?
  return false unless @messages.empty?
  return false if @transaction_mutex.locked?
  return false if @operating_mutex.locked?

  true
end

#fd_polling?Boolean

Returns true if FD-based polling mode is enabled.

Returns:

  • (Boolean)

    true if FD-based polling mode is enabled



465
466
467
468
469
470
# File 'lib/waterdrop/producer.rb', line 465

def fd_polling?
  return @fd_polling unless @fd_polling.nil?
  return false unless config

  @fd_polling = config.polling.mode == :fd
end

#inspectString

Returns mutex-safe inspect details.

Returns:

  • (String)

    mutex-safe inspect details



434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
# File 'lib/waterdrop/producer.rb', line 434

def inspect
  # Basic info that's always safe to access
  parts = []
  parts << "id=#{@id.inspect}"
  parts << "status=#{@status}" if @status

  # Try to get buffer info safely
  if @buffer_mutex.try_lock
    begin
      parts << "buffer_size=#{@messages.size}"
    ensure
      @buffer_mutex.unlock
    end
  else
    parts << "buffer_size=busy"
  end

  # Check if client is connected without triggering connection
  parts << if @status.connected?
    "connected=true"
  else
    "connected=false"
  end

  parts << "operations=#{@operations_in_progress.value}"
  parts << "in_transaction=true" if @transaction_mutex.locked?

  "#<#{self.class.name}:#{format("%#x", object_id)} #{parts.join(" ")}>"
end

#middlewareWaterDrop::Producer::Middleware

Returns and caches the middleware object that may be used

Returns:

  • (WaterDrop::Producer::Middleware)


263
264
265
# File 'lib/waterdrop/producer.rb', line 263

def middleware
  @middleware ||= config.middleware
end

#partition_count(topic) ⇒ Integer

Note:

It uses the underlying ‘rdkafka-ruby` partition count fetch and cache.

Fetches and caches the partition count of a topic

Parameters:

  • topic (String)

    topic for which we want to get the number of partitions

Returns:

  • (Integer)

    number of partitions of the requested topic or -1 if number could not be retrieved.



220
221
222
# File 'lib/waterdrop/producer.rb', line 220

def partition_count(topic)
  client.partition_count(topic.to_s)
end

#pollerWaterDrop::Polling::Poller

Returns the poller instance for this producer

Returns:



475
476
477
478
479
480
# File 'lib/waterdrop/producer.rb', line 475

def poller
  return @poller unless @poller.nil?
  return nil unless config

  @poller = config.polling.poller || Polling::Poller.instance
end

#purgeObject

Note:

This is an operation that can cause data loss. Keep that in mind. It will not only purge the internal WaterDrop buffer but will also purge the librdkafka queue as well as will cancel any outgoing messages dispatches.

Purges data from both the buffer queue as well as the librdkafka queue.



229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/waterdrop/producer.rb', line 229

def purge
  @monitor.instrument("buffer.purged", producer_id: id) do
    @buffer_mutex.synchronize do
      @messages = []
    end

    # We should not purge if there is no client initialized
    # It may not be initialized if we created a new producer that never connected to kafka,
    # we used buffer and purged. In cases like this client won't exist
    @connecting_mutex.synchronize do
      @client&.purge
    end
  end
end

#queue_sizeInteger Also known as: queue_length

Note:

This only counts messages in the rdkafka queue, not the internal WaterDrop buffer. To get the internal buffer count, use ‘messages.size`.

Note:

Returns 0 when the producer is not connected as there cannot be any pending messages if we haven’t connected.

Returns the number of messages in the librdkafka producer queue.

This count includes:

  • Messages waiting to be sent to the broker

  • Messages currently in-flight (being transmitted)

  • Delivery reports waiting to be processed

Examples:

Check pending messages

producer.queue_size #=> 42

Check total pending work (buffer + rdkafka queue)

internal_buffer = producer.messages.size
rdkafka_queue = producer.queue_size
total_pending = internal_buffer + rdkafka_queue

Returns:

  • (Integer)

    the number of pending messages in the rdkafka queue, or 0 if the producer is not connected



201
202
203
204
205
206
207
208
209
# File 'lib/waterdrop/producer.rb', line 201

def queue_size
  return 0 unless @status.connected?

  @connecting_mutex.synchronize do
    return 0 unless @client

    @client.queue_size
  end
end

#setupObject

Sets up the whole configuration and initializes all that is needed



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/waterdrop/producer.rb', line 87

def setup(...)
  raise Errors::ProducerAlreadyConfiguredError, id unless @status.initial?

  @config = Config
    .new
    .setup(...)
    .config

  @id = @config.id
  @monitor = @config.monitor
  @contract = Contracts::Message.new(max_payload_size: @config.max_payload_size)
  @default_variant = Variant.new(self, default: true)

  if @config.idle_disconnect_timeout.zero?
    @status.configured!

    # Instrument producer configuration for global listeners
    class_monitor.instrument(
      "producer.configured",
      producer: self,
      producer_id: @id,
      config: @config
    )

    return
  end

  # Setup idle disconnect listener if configured so we preserve tcp connections on rarely
  # used producers
  disconnector = Instrumentation::IdleDisconnectorListener.new(
    self,
    disconnect_timeout: @config.idle_disconnect_timeout
  )

  @monitor.subscribe(disconnector)

  @status.configured!

  # Instrument producer configuration for global listeners
  class_monitor.instrument(
    "producer.configured",
    producer: self,
    producer_id: @id,
    config: @config
  )
end

#with(**args) ⇒ WaterDrop::Producer::Variant Also known as: variant

Builds the variant alteration and returns it.

Parameters:

  • args (Hash)

    variant configuration options

Options Hash (**args):

  • :max_wait_timeout (Integer, nil)

    alteration to max wait timeout or nil to use default

  • :topic_config (Hash)

    extra topic configuration that can be altered

  • :default (Boolean)

    is this a default variant or an altered one

Returns:

See Also:



253
254
255
256
257
# File 'lib/waterdrop/producer.rb', line 253

def with(**args)
  ensure_active!

  Variant.new(self, **args)
end