Class: Rdkafka::Producer
- Inherits:
-
Object
- Object
- Rdkafka::Producer
- Defined in:
- lib/rdkafka/producer.rb,
lib/rdkafka/producer/delivery_handle.rb,
lib/rdkafka/producer/delivery_report.rb
Overview
Defined Under Namespace
Classes: DeliveryHandle, DeliveryReport
Instance Attribute Summary collapse
-
#delivery_callback ⇒ nil
writeonly
Set a callback that will be called every time a message is successfully produced.
Instance Method Summary collapse
- #arity(callback) ⇒ Object
- #call_delivery_callback(delivery_report, delivery_handle) ⇒ Object
-
#close ⇒ Object
Close this producer and wait for the internal poll queue to empty.
-
#closed? ⇒ Boolean
Whether this producer has closed.
-
#flush(timeout_ms = 5_000) ⇒ Object
Wait until all outstanding producer requests are completed, with the given timeout in seconds.
-
#name ⇒ String
Producer name.
-
#partition_count(topic) ⇒ Object
Partition count for a given topic.
-
#produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) ⇒ DeliveryHandle
Produces a message to a Kafka topic.
Instance Attribute Details
#delivery_callback=(callback) ⇒ nil
Set a callback that will be called every time a message is successfully produced. The callback is called with a DeliveryReport and DeliveryHandle
60 61 62 63 64 |
# File 'lib/rdkafka/producer.rb', line 60 def delivery_callback=(callback) raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) @delivery_callback = callback @delivery_callback_arity = arity(callback) end |
Instance Method Details
#arity(callback) ⇒ Object
224 225 226 227 228 |
# File 'lib/rdkafka/producer.rb', line 224 def arity(callback) return callback.arity if callback.respond_to?(:arity) callback.method(:call).arity end |
#call_delivery_callback(delivery_report, delivery_handle) ⇒ Object
217 218 219 220 221 222 |
# File 'lib/rdkafka/producer.rb', line 217 def call_delivery_callback(delivery_report, delivery_handle) return unless @delivery_callback args = [delivery_report, delivery_handle].take(@delivery_callback_arity) @delivery_callback.call(*args) end |
#close ⇒ Object
Close this producer and wait for the internal poll queue to empty.
67 68 69 70 71 |
# File 'lib/rdkafka/producer.rb', line 67 def close return if closed? ObjectSpace.undefine_finalizer(self) @native_kafka.close end |
#closed? ⇒ Boolean
Whether this producer has closed
74 75 76 |
# File 'lib/rdkafka/producer.rb', line 74 def closed? @native_kafka.closed? end |
#flush(timeout_ms = 5_000) ⇒ Object
Wait until all outstanding producer requests are completed, with the given timeout in seconds. Call this before closing a producer to ensure delivery of all messages.
82 83 84 85 86 87 88 |
# File 'lib/rdkafka/producer.rb', line 82 def flush(timeout_ms=5_000) closed_producer_check(__method__) @native_kafka.with_inner do |inner| Rdkafka::Bindings.rd_kafka_flush(inner, timeout_ms) end end |
#name ⇒ String
Returns producer name.
48 49 50 51 52 |
# File 'lib/rdkafka/producer.rb', line 48 def name @name ||= @native_kafka.with_inner do |inner| ::Rdkafka::Bindings.rd_kafka_name(inner) end end |
#partition_count(topic) ⇒ Object
Partition count for a given topic. NOTE: If 'allow.auto.create.topics' is set to true in the broker, the topic will be auto-created after returning nil.
We cache the partition count for a given topic for given time
This prevents us in case someone uses partition_key from querying for the count with
each message. Instead we query once every 30 seconds at most
103 104 105 106 107 108 109 110 111 |
# File 'lib/rdkafka/producer.rb', line 103 def partition_count(topic) closed_producer_check(__method__) @_partitions_count_cache.delete_if do |_, cached| monotonic_now - cached.first > PARTITIONS_COUNT_TTL end @_partitions_count_cache[topic].last end |
#produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) ⇒ DeliveryHandle
Produces a message to a Kafka topic. The message is added to rdkafka's queue, call wait on the returned delivery handle to make sure it is delivered.
When no partition is specified the underlying Kafka library picks a partition based on the key. If no key is specified, a random partition will be used. When a timestamp is provided this is used instead of the auto-generated timestamp.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/rdkafka/producer.rb', line 129 def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) closed_producer_check(__method__) # Start by checking and converting the input # Get payload length payload_size = if payload.nil? 0 else payload.bytesize end # Get key length key_size = if key.nil? 0 else key.bytesize end if partition_key partition_count = partition_count(topic) # If the topic is not present, set to -1 partition = Rdkafka::Bindings.partitioner(partition_key, partition_count, @partitioner_name) if partition_count end # If partition is nil, use -1 to let librdafka set the partition randomly or # based on the key when present. partition ||= -1 # If timestamp is nil use 0 and let Kafka set one. If an integer or time # use it. = if .nil? 0 elsif .is_a?(Integer) elsif .is_a?(Time) (.to_i * 1000) + (.usec / 1000) else raise TypeError.new("Timestamp has to be nil, an Integer or a Time") end delivery_handle = DeliveryHandle.new delivery_handle[:pending] = true delivery_handle[:response] = -1 delivery_handle[:partition] = -1 delivery_handle[:offset] = -1 DeliveryHandle.register(delivery_handle) args = [ :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TOPIC, :string, topic, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_MSGFLAGS, :int, Rdkafka::Bindings::RD_KAFKA_MSG_F_COPY, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_VALUE, :buffer_in, payload, :size_t, payload_size, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_KEY, :buffer_in, key, :size_t, key_size, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_PARTITION, :int32, partition, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TIMESTAMP, :int64, , :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_OPAQUE, :pointer, delivery_handle, ] if headers headers.each do |key0, value0| key = key0.to_s value = value0.to_s args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_HEADER args << :string << key args << :pointer << value args << :size_t << value.bytes.size end end args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_END # Produce the message response = @native_kafka.with_inner do |inner| Rdkafka::Bindings.rd_kafka_producev( inner, *args ) end # Raise error if the produce call was not successful if response != 0 DeliveryHandle.remove(delivery_handle.to_ptr.address) raise RdkafkaError.new(response) end delivery_handle end |