Class: Rdkafka::Producer

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/producer.rb,
lib/rdkafka/producer/delivery_handle.rb,
lib/rdkafka/producer/delivery_report.rb

Overview

A producer for Kafka messages. To create a producer set up a Config and call producer on that.

Defined Under Namespace

Classes: DeliveryHandle, DeliveryReport

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#delivery_callback=(callback) ⇒ nil

Set a callback that will be called every time a message is successfully produced. The callback is called with a DeliveryReport and DeliveryHandle

Parameters:

  • callback (Proc, #call)

    The callback

Returns:

  • (nil)

Raises:

  • (TypeError)


60
61
62
63
64
# File 'lib/rdkafka/producer.rb', line 60

def delivery_callback=(callback)
  raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call)
  @delivery_callback = callback
  @delivery_callback_arity = arity(callback)
end

Instance Method Details

#abort_transaction(timeout_ms = -1)) ⇒ Object



98
99
100
101
102
103
104
105
# File 'lib/rdkafka/producer.rb', line 98

def abort_transaction(timeout_ms = -1)
  closed_producer_check(__method__)

  @native_kafka.with_inner do |inner|
    response_ptr = Rdkafka::Bindings.rd_kafka_abort_transaction(inner, timeout_ms)
    Rdkafka::RdkafkaError.validate!(response_ptr)
  end
end

#arity(callback) ⇒ Object



303
304
305
306
307
# File 'lib/rdkafka/producer.rb', line 303

def arity(callback)
  return callback.arity if callback.respond_to?(:arity)

  callback.method(:call).arity
end

#begin_transactionObject



78
79
80
81
82
83
84
85
86
# File 'lib/rdkafka/producer.rb', line 78

def begin_transaction
  closed_producer_check(__method__)

  @native_kafka.with_inner do |inner|
    response_ptr = Rdkafka::Bindings.rd_kafka_begin_transaction(inner)

    Rdkafka::RdkafkaError.validate!(response_ptr)
  end
end

#call_delivery_callback(delivery_report, delivery_handle) ⇒ Object



296
297
298
299
300
301
# File 'lib/rdkafka/producer.rb', line 296

def call_delivery_callback(delivery_report, delivery_handle)
  return unless @delivery_callback

  args = [delivery_report, delivery_handle].take(@delivery_callback_arity)
  @delivery_callback.call(*args)
end

#closeObject

Close this producer and wait for the internal poll queue to empty.



108
109
110
111
112
# File 'lib/rdkafka/producer.rb', line 108

def close
  return if closed?
  ObjectSpace.undefine_finalizer(self)
  @native_kafka.close
end

#closed?Boolean

Whether this producer has closed

Returns:

  • (Boolean)


115
116
117
# File 'lib/rdkafka/producer.rb', line 115

def closed?
  @native_kafka.closed?
end

#commit_transaction(timeout_ms = -1)) ⇒ Object



88
89
90
91
92
93
94
95
96
# File 'lib/rdkafka/producer.rb', line 88

def commit_transaction(timeout_ms = -1)
  closed_producer_check(__method__)

  @native_kafka.with_inner do |inner|
    response_ptr = Rdkafka::Bindings.rd_kafka_commit_transaction(inner, timeout_ms)

    Rdkafka::RdkafkaError.validate!(response_ptr)
  end
end

#flush(timeout_ms = 5_000) ⇒ Boolean

Note:

We raise an exception for other errors because based on the librdkafka docs, there should be no other errors.

Note:

For timed_out we do not raise an error to keep it backwards compatible

Wait until all outstanding producer requests are completed, with the given timeout in seconds. Call this before closing a producer to ensure delivery of all messages.

Parameters:

  • timeout_ms (Integer) (defaults to: 5_000)

    how long should we wait for flush of all messages

Returns:

  • (Boolean)

    true if no more data and all was flushed, false in case there are still outgoing messages after the timeout



130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/rdkafka/producer.rb', line 130

def flush(timeout_ms=5_000)
  closed_producer_check(__method__)

  error = @native_kafka.with_inner do |inner|
    response = Rdkafka::Bindings.rd_kafka_flush(inner, timeout_ms)
    Rdkafka::RdkafkaError.build(response)
  end

  # Early skip not to build the error message
  return true unless error
  return false if error.code == :timed_out

  raise(error)
end

#init_transactionsObject

Init transactions Run once per producer



68
69
70
71
72
73
74
75
76
# File 'lib/rdkafka/producer.rb', line 68

def init_transactions
  closed_producer_check(__method__)

  @native_kafka.with_inner do |inner|
    response_ptr = Rdkafka::Bindings.rd_kafka_init_transactions(inner, -1)

    Rdkafka::RdkafkaError.validate!(response_ptr) || true
  end
end

#nameString

Returns producer name.

Returns:

  • (String)

    producer name



48
49
50
51
52
# File 'lib/rdkafka/producer.rb', line 48

def name
  @name ||= @native_kafka.with_inner do |inner|
    ::Rdkafka::Bindings.rd_kafka_name(inner)
  end
end

#partition_count(topic) ⇒ Object

Partition count for a given topic. NOTE: If 'allow.auto.create.topics' is set to true in the broker, the topic will be auto-created after returning nil.

We cache the partition count for a given topic for given time This prevents us in case someone uses partition_key from querying for the count with each message. Instead we query once every 30 seconds at most

Parameters:

  • topic (String)

    The topic name.

  • topic (String)

    topic name

Returns:

  • partition count [Integer,nil]

  • (Integer)

    partition count for a given topic



182
183
184
185
186
187
188
189
190
# File 'lib/rdkafka/producer.rb', line 182

def partition_count(topic)
  closed_producer_check(__method__)

  @_partitions_count_cache.delete_if do |_, cached|
    monotonic_now - cached.first > PARTITIONS_COUNT_TTL
  end

  @_partitions_count_cache[topic].last
end

#produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) ⇒ DeliveryHandle

Produces a message to a Kafka topic. The message is added to rdkafka's queue, call wait on the returned delivery handle to make sure it is delivered.

When no partition is specified the underlying Kafka library picks a partition based on the key. If no key is specified, a random partition will be used. When a timestamp is provided this is used instead of the auto-generated timestamp.

Parameters:

  • topic (String)

    The topic to produce to

  • payload (String, nil) (defaults to: nil)

    The message's payload

  • key (String, nil) (defaults to: nil)

    The message's key

  • partition (Integer, nil) (defaults to: nil)

    Optional partition to produce to

  • partition_key (String, nil) (defaults to: nil)

    Optional partition key based on which partition assignment can happen

  • timestamp (Time, Integer, nil) (defaults to: nil)

    Optional timestamp of this message. Integer timestamp is in milliseconds since Jan 1 1970.

  • headers (Hash<String,String>) (defaults to: nil)

    Optional message headers

Returns:

  • (DeliveryHandle)

    Delivery handle that can be used to wait for the result of producing this message

Raises:

  • (RdkafkaError)

    When adding the message to rdkafka's queue failed



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/rdkafka/producer.rb', line 208

def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil)
  closed_producer_check(__method__)

  # Start by checking and converting the input

  # Get payload length
  payload_size = if payload.nil?
                   0
                 else
                   payload.bytesize
                 end

  # Get key length
  key_size = if key.nil?
               0
             else
               key.bytesize
             end

  if partition_key
    partition_count = partition_count(topic)
    # If the topic is not present, set to -1
    partition = Rdkafka::Bindings.partitioner(partition_key, partition_count, @partitioner_name) if partition_count
  end

  # If partition is nil, use -1 to let librdafka set the partition randomly or
  # based on the key when present.
  partition ||= -1

  # If timestamp is nil use 0 and let Kafka set one. If an integer or time
  # use it.
  raw_timestamp = if timestamp.nil?
                    0
                  elsif timestamp.is_a?(Integer)
                    timestamp
                  elsif timestamp.is_a?(Time)
                    (timestamp.to_i * 1000) + (timestamp.usec / 1000)
                  else
                    raise TypeError.new("Timestamp has to be nil, an Integer or a Time")
                  end

  delivery_handle = DeliveryHandle.new
  delivery_handle[:pending] = true
  delivery_handle[:response] = -1
  delivery_handle[:partition] = -1
  delivery_handle[:offset] = -1
  DeliveryHandle.register(delivery_handle)

  args = [
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TOPIC, :string, topic,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_MSGFLAGS, :int, Rdkafka::Bindings::RD_KAFKA_MSG_F_COPY,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_VALUE, :buffer_in, payload, :size_t, payload_size,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_KEY, :buffer_in, key, :size_t, key_size,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_PARTITION, :int32, partition,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TIMESTAMP, :int64, raw_timestamp,
    :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_OPAQUE, :pointer, delivery_handle,
  ]

  if headers
    headers.each do |key0, value0|
      key = key0.to_s
      value = value0.to_s
      args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_HEADER
      args << :string << key
      args << :pointer << value
      args << :size_t << value.bytes.size
    end
  end

  args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_END

  # Produce the message
  response = @native_kafka.with_inner do |inner|
    Rdkafka::Bindings.rd_kafka_producev(
      inner,
      *args
    )
  end

  # Raise error if the produce call was not successful
  if response != 0
    DeliveryHandle.remove(delivery_handle.to_ptr.address)
    Rdkafka::RdkafkaError.validate!(response)
  end

  delivery_handle
end

#purgeObject

Purges the outgoing queue and releases all resources.

Useful when closing the producer with outgoing messages to unstable clusters or when for any other reasons waiting cannot go on anymore. This purges both the queue and all the inflight requests + updates the delivery handles statuses so they can be materialized into purge_queue errors.



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/rdkafka/producer.rb', line 151

def purge
  closed_producer_check(__method__)

  @native_kafka.with_inner do |inner|
    response = Bindings.rd_kafka_purge(
      inner,
      Bindings::RD_KAFKA_PURGE_F_QUEUE | Bindings::RD_KAFKA_PURGE_F_INFLIGHT
    )

    Rdkafka::RdkafkaError.validate!(response)
  end

  # Wait for the purge to affect everything
  sleep(0.001) until flush(100)

  true
end