Module: Legion::Transport::Kafka

Extended by:
Logging::Helper
Defined in:
lib/legion/transport/kafka.rb,
lib/legion/transport/kafka/admin.rb,
lib/legion/transport/kafka/errors.rb,
lib/legion/transport/kafka/consumer.rb,
lib/legion/transport/kafka/defaults.rb,
lib/legion/transport/kafka/producer.rb,
lib/legion/transport/kafka/incoming_message.rb

Overview

Optional Kafka adapter for event streaming alongside RabbitMQ.

Kafka is NOT a replacement for RabbitMQ task dispatch — it runs alongside for patterns that need durable, replayable, append-only event logs:

- Telemetry / metrics pipelines
- Audit event streams
- Change data capture / event sourcing
- Consumer group fan-out with independent offset tracking

Feature-flagged via transport.kafka.enabled: false (default off). Requires the rdkafka gem as an optional runtime dependency.

Usage:

Legion::Transport::Kafka.publish('legion.audit', { event: 'login' })
Legion::Transport::Kafka.subscribe('legion.telemetry', group: 'metrics') do |msg|
  process(msg)
end
Legion::Transport::Kafka.replay('legion.audit', from_beginning: true) do |msg|
  reprocess(msg)
end

Defined Under Namespace

Modules: Admin, Consumer, Producer Classes: AdminError, ConsumerError, DisabledError, IncomingMessage, PublishError, UnavailableError

Constant Summary collapse

DEFAULTS =
{
  enabled:        false,
  brokers:        [ENV.fetch('transport.kafka.brokers', '127.0.0.1:9092')],
  consumer_group: ENV.fetch('transport.kafka.consumer_group', 'legion'),
  producer:       {
    acks:               ENV.fetch('transport.kafka.producer.acks', 'all'),
    retries:            (ENV['transport.kafka.producer.retries'] || 3).to_i,
    retry_backoff_ms:   (ENV['transport.kafka.producer.retry_backoff_ms'] || 100).to_i,
    message_timeout_ms: (ENV['transport.kafka.producer.message_timeout_ms'] || 30_000).to_i,
    compression:        ENV.fetch('transport.kafka.producer.compression', 'none'),
    batch_size:         (ENV['transport.kafka.producer.batch_size'] || 100).to_i,
    linger_ms:          (ENV['transport.kafka.producer.linger_ms'] || 5).to_i
  },
  consumer:       {
    poll_timeout_ms:          (ENV['transport.kafka.consumer.poll_timeout_ms'] || 1_000).to_i,
    max_poll_interval_ms:     (ENV['transport.kafka.consumer.max_poll_interval_ms'] || 300_000).to_i,
    session_timeout_ms:       (ENV['transport.kafka.consumer.session_timeout_ms'] || 30_000).to_i,
    auto_offset_reset:        ENV.fetch('transport.kafka.consumer.auto_offset_reset', 'latest'),
    enable_auto_commit:       ENV.fetch('transport.kafka.consumer.enable_auto_commit', 'false') == 'true',
    commit_interval_messages: (ENV['transport.kafka.consumer.commit_interval_messages'] || 100).to_i
  },
  admin:          {
    operation_timeout_ms: (ENV['transport.kafka.admin.operation_timeout_ms'] || 10_000).to_i
  },
  security:       {
    protocol:                 ENV.fetch('transport.kafka.security.protocol', 'plaintext'),
    sasl_mechanism:           ENV.fetch('transport.kafka.security.sasl_mechanism', ''),
    sasl_username:            ENV.fetch('transport.kafka.security.sasl_username', ''),
    sasl_password:            ENV.fetch('transport.kafka.security.sasl_password', ''),
    ssl_ca_cert_path:         ENV.fetch('transport.kafka.security.ssl_ca_cert_path', ''),
    ssl_client_cert_path:     ENV.fetch('transport.kafka.security.ssl_client_cert_path', ''),
    ssl_client_cert_key_path: ENV.fetch('transport.kafka.security.ssl_client_cert_key_path', '')
  }
}.freeze

Class Method Summary collapse

Class Method Details

.brokersObject

Returns the Kafka brokers list from settings.



112
113
114
# File 'lib/legion/transport/kafka.rb', line 112

def brokers
  Array(kafka_settings[:brokers]).flatten.compact
end

.default_groupObject

Returns the default consumer group from settings.



117
118
119
# File 'lib/legion/transport/kafka.rb', line 117

def default_group
  kafka_settings[:consumer_group] || 'legion'
end

.enabled?Boolean

Returns true when the Kafka adapter is enabled and rdkafka is available.

Returns:

  • (Boolean)


37
38
39
40
41
42
43
44
45
# File 'lib/legion/transport/kafka.rb', line 37

def enabled?
  return false unless kafka_settings[:enabled]

  require_rdkafka
  true
rescue Legion::Transport::Kafka::UnavailableError => e
  handle_exception(e, level: :debug, handled: true, operation: 'transport.kafka.enabled')
  false
end

.ensure_topic(topic, partitions: 1, replication_factor: 1, config: {}) ⇒ Object

Create a Kafka topic via the admin client. Idempotent — returns true if the topic already exists.

Parameters:

  • topic (String)
  • partitions (Integer) (defaults to: 1)
  • replication_factor (Integer) (defaults to: 1)
  • config (Hash) (defaults to: {})

    topic-level Kafka configs (e.g. retention.ms)



104
105
106
107
108
109
# File 'lib/legion/transport/kafka.rb', line 104

def ensure_topic(topic, partitions: 1, replication_factor: 1, config: {})
  require_enabled!
  Admin.ensure_topic(topic, partitions:         partitions,
                            replication_factor: replication_factor,
                            config:             config)
end

.kafka_settingsObject

Returns the raw Kafka settings hash.



122
123
124
125
126
127
# File 'lib/legion/transport/kafka.rb', line 122

def kafka_settings
  Legion::Settings[:transport][:kafka]
rescue StandardError => e
  handle_exception(e, level: :debug, handled: true, operation: 'transport.kafka.settings')
  Legion::Transport::Kafka::DEFAULTS
end

.publish(topic, payload, key: nil, headers: {}, partition: nil) ⇒ Hash

Publish a single message to a Kafka topic.

Parameters:

  • topic (String)

    Kafka topic name

  • payload (String, Hash)

    message body; Hash is JSON-encoded automatically

  • key (String, nil) (defaults to: nil)

    optional partition key

  • headers (Hash) (defaults to: {})

    optional message headers

  • partition (Integer, nil) (defaults to: nil)

    explicit partition (nil = auto-assigned)

Returns:

  • (Hash)

    delivery report { topic:, partition:, offset: }



55
56
57
58
# File 'lib/legion/transport/kafka.rb', line 55

def publish(topic, payload, key: nil, headers: {}, partition: nil)
  require_enabled!
  Producer.publish(topic, payload, key: key, headers: headers, partition: partition)
end

.replay(topic, from_beginning: true, from_offset: nil, from_timestamp: nil, replay_group: "legion-replay-#{SecureRandom.hex(4)}") {|message| ... } ⇒ Object

Replay a topic from a specific point. Creates an isolated consumer group for the replay session so that production offsets are not disturbed.

Parameters:

  • topic (String)

    topic to replay

  • from_beginning (Boolean) (defaults to: true)

    start from offset 0 (default true for replay)

  • from_offset (Integer, nil) (defaults to: nil)

    explicit partition-0 offset to start from

  • from_timestamp (Time, nil) (defaults to: nil)

    seek to offset nearest this timestamp

  • replay_group (String) (defaults to: "legion-replay-#{SecureRandom.hex(4)}")

    temporary consumer group ID

Yields:

  • (message)

    called for each replayed message



86
87
88
89
90
91
92
93
94
95
# File 'lib/legion/transport/kafka.rb', line 86

def replay(topic, from_beginning: true, from_offset: nil, from_timestamp: nil,
           replay_group: "legion-replay-#{SecureRandom.hex(4)}", &)
  require_enabled!
  Consumer.replay(topic,
                  from_beginning: from_beginning,
                  from_offset:    from_offset,
                  from_timestamp: from_timestamp,
                  replay_group:   replay_group,
                  &)
end

.reset!Object

Resets internal producer/consumer state (useful in tests).



130
131
132
133
# File 'lib/legion/transport/kafka.rb', line 130

def reset!
  Producer.reset!
  Consumer.reset!
end

.subscribe(topic, group: default_group, from_beginning: false, max_messages: nil) {|message| ... } ⇒ Object

Subscribe to a Kafka topic with consumer group semantics. Runs the block synchronously in the calling thread for each message. For background polling, wrap in a Thread / actor.

Parameters:

  • topic (String, Array<String>)

    topic(s) to subscribe to

  • group (String) (defaults to: default_group)

    consumer group ID (default from settings)

  • from_beginning (Boolean) (defaults to: false)

    start from earliest offset (default false = latest)

  • max_messages (Integer, nil) (defaults to: nil)

    stop after N messages (nil = run forever)

Yields:

  • (message)

    called for each received message

Yield Parameters:



70
71
72
73
74
# File 'lib/legion/transport/kafka.rb', line 70

def subscribe(topic, group: default_group, from_beginning: false, max_messages: nil, &)
  require_enabled!
  Consumer.subscribe(topic, group: group, from_beginning: from_beginning,
                            max_messages: max_messages, &)
end