Module: Legion::Transport::Kafka::Producer
- Extended by:
- Logging::Helper
- Defined in:
- lib/legion/transport/kafka/producer.rb
Overview
Wraps an rdkafka producer with connection lifecycle management. The underlying producer handle is lazily created on first use and shared across calls (rdkafka producers are thread-safe).
Class Method Summary collapse
-
.publish(topic, payload, key: nil, headers: {}, partition: nil) ⇒ Hash
Publish a message to a Kafka topic.
-
.reset! ⇒ Object
Close the underlying producer and flush any in-flight messages.
Class Method Details
.publish(topic, payload, key: nil, headers: {}, partition: nil) ⇒ Hash
Publish a message to a Kafka topic.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/legion/transport/kafka/producer.rb', line 24 def publish(topic, payload, key: nil, headers: {}, partition: nil) encoded = encode(payload) delivery_handle = (topic, encoded, key: key, headers: stringify_headers(headers), partition: partition) report = delivery_handle.wait(max_wait_timeout: delivery_timeout) log_publish(topic, key, report) { topic: report.topic_name, partition: report.partition, offset: report.offset } rescue Legion::Transport::Kafka::PublishError raise rescue StandardError => e handle_exception(e, level: :error, handled: false, operation: 'transport.kafka.producer.publish', topic: topic) raise Legion::Transport::Kafka::PublishError, "Kafka publish to #{topic} failed: #{e.}" end |
.reset! ⇒ Object
Close the underlying producer and flush any in-flight messages.
41 42 43 44 45 46 |
# File 'lib/legion/transport/kafka/producer.rb', line 41 def reset! @mutex&.synchronize do @producer&.close @producer = nil end end |