Class: WaterDrop::Producer
- Inherits:
-
Object
- Object
- WaterDrop::Producer
- Extended by:
- Forwardable
- Defined in:
- lib/waterdrop/producer.rb,
lib/waterdrop/producer/sync.rb,
lib/waterdrop/producer/async.rb,
lib/waterdrop/producer/buffer.rb,
lib/waterdrop/producer/status.rb,
lib/waterdrop/producer/builder.rb,
lib/waterdrop/producer/dummy_client.rb
Overview
Main WaterDrop messages producer
Defined Under Namespace
Modules: Async, Buffer, Sync Classes: Builder, DummyClient, Status
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Dry-configurable config object.
-
#id ⇒ String
readonly
Uuid of the current producer.
-
#messages ⇒ Concurrent::Array
readonly
Internal messages buffer.
-
#monitor ⇒ Object
readonly
Monitor we want to use.
-
#status ⇒ Status
readonly
Producer status object.
Instance Method Summary collapse
-
#client ⇒ Rdkafka::Producer
Raw rdkafka producer.
-
#close ⇒ Object
Flushes the buffers in a sync way and closes the producer.
-
#ensure_active! ⇒ Object
Ensures that we don’t run any operations when the producer is not configured or when it was already closed.
-
#initialize(&block) ⇒ Producer
constructor
Creates a not-yet-configured instance of the producer.
-
#produce(message) ⇒ Object
Runs the client produce method with a given message.
-
#setup(&block) ⇒ Object
Sets up the whole configuration and initializes all that is needed.
-
#validate_message!(message) ⇒ Object
Ensures that the message we want to send out to Kafka is actually valid and that it can be sent there.
-
#wait(handler) ⇒ Object
Waits on a given handler.
Methods included from Buffer
#buffer, #buffer_many, #flush_async, #flush_sync
Methods included from Async
#produce_async, #produce_many_async
Methods included from Sync
#produce_many_sync, #produce_sync
Constructor Details
#initialize(&block) ⇒ Producer
Creates a not-yet-configured instance of the producer
35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/waterdrop/producer.rb', line 35 def initialize(&block) @buffer_mutex = Mutex.new @connecting_mutex = Mutex.new @closing_mutex = Mutex.new @status = Status.new @messages = Concurrent::Array.new return unless block setup(&block) end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns dry-configurable config object.
30 31 32 |
# File 'lib/waterdrop/producer.rb', line 30 def config @config end |
#id ⇒ String (readonly)
Returns uuid of the current producer.
22 23 24 |
# File 'lib/waterdrop/producer.rb', line 22 def id @id end |
#messages ⇒ Concurrent::Array (readonly)
Returns internal messages buffer.
26 27 28 |
# File 'lib/waterdrop/producer.rb', line 26 def @messages end |
#monitor ⇒ Object (readonly)
Returns monitor we want to use.
28 29 30 |
# File 'lib/waterdrop/producer.rb', line 28 def monitor @monitor end |
#status ⇒ Status (readonly)
Returns producer status object.
24 25 26 |
# File 'lib/waterdrop/producer.rb', line 24 def status @status end |
Instance Method Details
#client ⇒ Rdkafka::Producer
Client is lazy initialized, keeping in mind also the fact of a potential fork that can happen any time.
It is not recommended to fork a producer that is already in use so in case of bootstrapping a cluster, it’s much better to fork configured but not used producers
Returns raw rdkafka producer.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/waterdrop/producer.rb', line 69 def client return @client if @client && @pid == Process.pid # Don't allow to obtain a client reference for a producer that was not configured raise Errors::ProducerNotConfiguredError, id if @status.initial? @connecting_mutex.synchronize do return @client if @client && @pid == Process.pid # We should raise an error when trying to use a producer from a fork, that is already # connected to Kafka. We allow forking producers only before they are used raise Errors::ProducerUsedInParentProcess, Process.pid if @status.connected? # We undefine all the finalizers, in case it was a fork, so the finalizers from the parent # process don't leak ObjectSpace.undefine_finalizer(id) # Finalizer tracking is needed for handling shutdowns gracefully. # I don't expect everyone to remember about closing all the producers all the time, thus # this approach is better. Although it is still worth keeping in mind, that this will # block GC from removing a no longer used producer unless closed properly but at least # won't crash the VM upon closing the process ObjectSpace.define_finalizer(id, proc { close }) @pid = Process.pid @client = Builder.new.call(self, @config) # Register statistics runner for this particular type of callbacks ::Karafka::Core::Instrumentation.statistics_callbacks.add( @id, Instrumentation::Callbacks::Statistics.new(@id, @client.name, @config.monitor) ) # Register error tracking callback ::Karafka::Core::Instrumentation.error_callbacks.add( @id, Instrumentation::Callbacks::Error.new(@id, @client.name, @config.monitor) ) @status.connected! end @client end |
#close ⇒ Object
Flushes the buffers in a sync way and closes the producer
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/waterdrop/producer.rb', line 114 def close @closing_mutex.synchronize do return unless @status.active? @monitor.instrument( 'producer.closed', producer_id: id ) do @status.closing! # No need for auto-gc if everything got closed by us # This should be used only in case a producer was not closed properly and forgotten ObjectSpace.undefine_finalizer(id) # We save this thread id because we need to bypass the activity verification on the # producer for final flush of buffers. @closing_thread_id = Thread.current.object_id # Flush has its own buffer mutex but even if it is blocked, flushing can still happen # as we close the client after the flushing (even if blocked by the mutex) flush(true) # We should not close the client in several threads the same time # It is safe to run it several times but not exactly the same moment # We also mark it as closed only if it was connected, if not, it would trigger a new # connection that anyhow would be immediately closed client.close(@config.max_wait_timeout) if @client # Remove callbacks runners that were registered ::Karafka::Core::Instrumentation.statistics_callbacks.delete(@id) ::Karafka::Core::Instrumentation.error_callbacks.delete(@id) @status.closed! end end end |
#ensure_active! ⇒ Object
Ensures that we don’t run any operations when the producer is not configured or when it was already closed
153 154 155 156 157 158 159 160 161 |
# File 'lib/waterdrop/producer.rb', line 153 def ensure_active! return if @status.active? raise Errors::ProducerNotConfiguredError, id if @status.initial? raise Errors::ProducerClosedError, id if @status.closing? || @status.closed? # This should never happen raise Errors::StatusInvalidError, [id, @status.to_s] end |
#produce(message) ⇒ Object
Runs the client produce method with a given message
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/waterdrop/producer.rb', line 174 def produce() client.produce(**) rescue SUPPORTED_FLOW_ERRORS.first => e # Unless we want to wait and retry and it's a full queue, we raise normally raise unless @config.wait_on_queue_full raise unless e.code == :queue_full # We use this syntax here because we want to preserve the original `#cause` when we # instrument the error and there is no way to manually assign `#cause` value. We want to keep # the original cause to maintain the same API across all the errors dispatched to the # notifications pipeline. begin raise Errors::ProduceError rescue Errors::ProduceError => e # We want to instrument on this event even when we restart it. # The reason is simple: instrumentation and visibility. # We can recover from this, but despite that we should be able to instrument this. # If this type of event happens too often, it may indicate that the buffer settings are not # well configured. @monitor.instrument( 'error.occurred', producer_id: id, message: , error: e, type: 'message.produce' ) # We do not poll the producer because polling happens in a background thread # It also should not be a frequent case (queue full), hence it's ok to just throttle. sleep @config.wait_on_queue_full_timeout end retry end |
#setup(&block) ⇒ Object
Sets up the whole configuration and initializes all that is needed
50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/waterdrop/producer.rb', line 50 def setup(&block) raise Errors::ProducerAlreadyConfiguredError, id unless @status.initial? @config = Config .new .setup(&block) .config @id = @config.id @monitor = @config.monitor @contract = Contracts::Message.new(max_payload_size: @config.max_payload_size) @status.configured! end |
#validate_message!(message) ⇒ Object
Ensures that the message we want to send out to Kafka is actually valid and that it can be sent there
167 168 169 |
# File 'lib/waterdrop/producer.rb', line 167 def () @contract.validate!(, Errors::MessageInvalidError) end |
#wait(handler) ⇒ Object
Waits on a given handler
212 213 214 215 216 217 |
# File 'lib/waterdrop/producer.rb', line 212 def wait(handler) handler.wait( max_wait_timeout: @config.max_wait_timeout, wait_timeout: @config.wait_timeout ) end |