Class: Rdkafka::Producer
- Inherits:
-
Object
- Object
- Rdkafka::Producer
- Defined in:
- lib/rdkafka/producer.rb,
lib/rdkafka/producer/client.rb,
lib/rdkafka/producer/delivery_handle.rb,
lib/rdkafka/producer/delivery_report.rb
Overview
Defined Under Namespace
Classes: Client, DeliveryHandle, DeliveryReport
Instance Attribute Summary collapse
-
#delivery_callback ⇒ nil
writeonly
Set a callback that will be called every time a message is successfully produced.
Instance Method Summary collapse
-
#close ⇒ Object
Close this producer and wait for the internal poll queue to empty.
- #closed_producer_check(method) ⇒ Object
-
#partition_count(topic) ⇒ Object
Partition count for a given topic.
-
#produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) ⇒ DeliveryHandle
Produces a message to a Kafka topic.
Instance Attribute Details
#delivery_callback=(callback) ⇒ nil
Set a callback that will be called every time a message is successfully produced. The callback is called with a DeliveryReport
27 28 29 30 |
# File 'lib/rdkafka/producer.rb', line 27 def delivery_callback=(callback) raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) @delivery_callback = callback end |
Instance Method Details
#close ⇒ Object
Close this producer and wait for the internal poll queue to empty.
33 34 35 36 37 |
# File 'lib/rdkafka/producer.rb', line 33 def close ObjectSpace.undefine_finalizer(self) @client.close end |
#closed_producer_check(method) ⇒ Object
158 159 160 |
# File 'lib/rdkafka/producer.rb', line 158 def closed_producer_check(method) raise Rdkafka::ClosedProducerError.new(method) if @client.closed? end |
#partition_count(topic) ⇒ Object
Partition count for a given topic. NOTE: If 'allow.auto.create.topics' is set to true in the broker, the topic will be auto-created after returning nil.
46 47 48 49 |
# File 'lib/rdkafka/producer.rb', line 46 def partition_count(topic) closed_producer_check(__method__) Rdkafka::Metadata.new(@client.native, topic).topics&.first[:partition_count] end |
#produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) ⇒ DeliveryHandle
Produces a message to a Kafka topic. The message is added to rdkafka's queue, call wait on the returned delivery handle to make sure it is delivered.
When no partition is specified the underlying Kafka library picks a partition based on the key. If no key is specified, a random partition will be used. When a timestamp is provided this is used instead of the auto-generated timestamp.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/rdkafka/producer.rb', line 67 def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) closed_producer_check(__method__) # Start by checking and converting the input # Get payload length payload_size = if payload.nil? 0 else payload.bytesize end # Get key length key_size = if key.nil? 0 else key.bytesize end if partition_key partition_count = partition_count(topic) # If the topic is not present, set to -1 partition = Rdkafka::Bindings.partitioner(partition_key, partition_count, @partitioner_name) if partition_count end # If partition is nil, use -1 to let librdafka set the partition randomly or # based on the key when present. partition ||= -1 # If timestamp is nil use 0 and let Kafka set one. If an integer or time # use it. = if .nil? 0 elsif .is_a?(Integer) elsif .is_a?(Time) (.to_i * 1000) + (.usec / 1000) else raise TypeError.new("Timestamp has to be nil, an Integer or a Time") end delivery_handle = DeliveryHandle.new delivery_handle[:pending] = true delivery_handle[:response] = -1 delivery_handle[:partition] = -1 delivery_handle[:offset] = -1 DeliveryHandle.register(delivery_handle) args = [ :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TOPIC, :string, topic, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_MSGFLAGS, :int, Rdkafka::Bindings::RD_KAFKA_MSG_F_COPY, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_VALUE, :buffer_in, payload, :size_t, payload_size, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_KEY, :buffer_in, key, :size_t, key_size, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_PARTITION, :int32, partition, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TIMESTAMP, :int64, , :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_OPAQUE, :pointer, delivery_handle, ] if headers headers.each do |key0, value0| key = key0.to_s value = value0.to_s args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_HEADER args << :string << key args << :pointer << value args << :size_t << value.bytes.size end end args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_END # Produce the message response = Rdkafka::Bindings.rd_kafka_producev( @client.native, *args ) # Raise error if the produce call was not successful if response != 0 DeliveryHandle.remove(delivery_handle.to_ptr.address) raise RdkafkaError.new(response) end delivery_handle end |