Module: Legion::Transport::Kafka::Producer

Extended by:
Logging::Helper
Defined in:
lib/legion/transport/kafka/producer.rb

Overview

Wraps an rdkafka producer with connection lifecycle management. The underlying producer handle is lazily created on first use and shared across calls (rdkafka producers are thread-safe).

Class Method Summary collapse

Class Method Details

.publish(topic, payload, key: nil, headers: {}, partition: nil) ⇒ Hash

Publish a message to a Kafka topic.

Parameters:

  • topic (String)
  • payload (String, Hash)

    Hash values are JSON-encoded

  • key (String, nil) (defaults to: nil)

    partition key

  • headers (Hash) (defaults to: {})

    Kafka message headers

  • partition (Integer, nil) (defaults to: nil)

    explicit partition; nil = librdkafka auto-assign

Returns:

  • (Hash)

    { topic:, partition:, offset: }



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/legion/transport/kafka/producer.rb', line 24

def publish(topic, payload, key: nil, headers: {}, partition: nil)
  encoded = encode(payload)
  delivery_handle = produce_message(topic, encoded, key:       key,
                                                    headers:   stringify_headers(headers),
                                                    partition: partition)
  report = delivery_handle.wait(max_wait_timeout: delivery_timeout)
  log_publish(topic, key, report)
  { topic: report.topic_name, partition: report.partition, offset: report.offset }
rescue Legion::Transport::Kafka::PublishError
  raise
rescue StandardError => e
  handle_exception(e, level: :error, handled: false, operation: 'transport.kafka.producer.publish',
                   topic: topic)
  raise Legion::Transport::Kafka::PublishError, "Kafka publish to #{topic} failed: #{e.message}"
end

.reset!Object

Close the underlying producer and flush any in-flight messages.



41
42
43
44
45
46
# File 'lib/legion/transport/kafka/producer.rb', line 41

def reset!
  @mutex&.synchronize do
    @producer&.close
    @producer = nil
  end
end