Module: Legion::Transport::Kafka
- Extended by:
- Logging::Helper
- Defined in:
- lib/legion/transport/kafka.rb,
lib/legion/transport/kafka/admin.rb,
lib/legion/transport/kafka/errors.rb,
lib/legion/transport/kafka/consumer.rb,
lib/legion/transport/kafka/defaults.rb,
lib/legion/transport/kafka/producer.rb,
lib/legion/transport/kafka/incoming_message.rb
Overview
Optional Kafka adapter for event streaming alongside RabbitMQ.
Kafka is NOT a replacement for RabbitMQ task dispatch — it runs alongside for patterns that need durable, replayable, append-only event logs:
- Telemetry / metrics pipelines
- Audit event streams
- Change data capture / event sourcing
- Consumer group fan-out with independent offset tracking
Feature-flagged via transport.kafka.enabled: false (default off). Requires the rdkafka gem as an optional runtime dependency.
Usage:
Legion::Transport::Kafka.publish('legion.audit', { event: 'login' })
Legion::Transport::Kafka.subscribe('legion.telemetry', group: 'metrics') do |msg|
process(msg)
end
Legion::Transport::Kafka.replay('legion.audit', from_beginning: true) do |msg|
reprocess(msg)
end
Defined Under Namespace
Modules: Admin, Consumer, Producer Classes: AdminError, ConsumerError, DisabledError, IncomingMessage, PublishError, UnavailableError
Constant Summary collapse
- DEFAULTS =
{ enabled: false, brokers: [ENV.fetch('transport.kafka.brokers', '127.0.0.1:9092')], consumer_group: ENV.fetch('transport.kafka.consumer_group', 'legion'), producer: { acks: ENV.fetch('transport.kafka.producer.acks', 'all'), retries: (ENV['transport.kafka.producer.retries'] || 3).to_i, retry_backoff_ms: (ENV['transport.kafka.producer.retry_backoff_ms'] || 100).to_i, message_timeout_ms: (ENV['transport.kafka.producer.message_timeout_ms'] || 30_000).to_i, compression: ENV.fetch('transport.kafka.producer.compression', 'none'), batch_size: (ENV['transport.kafka.producer.batch_size'] || 100).to_i, linger_ms: (ENV['transport.kafka.producer.linger_ms'] || 5).to_i }, consumer: { poll_timeout_ms: (ENV['transport.kafka.consumer.poll_timeout_ms'] || 1_000).to_i, max_poll_interval_ms: (ENV['transport.kafka.consumer.max_poll_interval_ms'] || 300_000).to_i, session_timeout_ms: (ENV['transport.kafka.consumer.session_timeout_ms'] || 30_000).to_i, auto_offset_reset: ENV.fetch('transport.kafka.consumer.auto_offset_reset', 'latest'), enable_auto_commit: ENV.fetch('transport.kafka.consumer.enable_auto_commit', 'false') == 'true', commit_interval_messages: (ENV['transport.kafka.consumer.commit_interval_messages'] || 100).to_i }, admin: { operation_timeout_ms: (ENV['transport.kafka.admin.operation_timeout_ms'] || 10_000).to_i }, security: { protocol: ENV.fetch('transport.kafka.security.protocol', 'plaintext'), sasl_mechanism: ENV.fetch('transport.kafka.security.sasl_mechanism', ''), sasl_username: ENV.fetch('transport.kafka.security.sasl_username', ''), sasl_password: ENV.fetch('transport.kafka.security.sasl_password', ''), ssl_ca_cert_path: ENV.fetch('transport.kafka.security.ssl_ca_cert_path', ''), ssl_client_cert_path: ENV.fetch('transport.kafka.security.ssl_client_cert_path', ''), ssl_client_cert_key_path: ENV.fetch('transport.kafka.security.ssl_client_cert_key_path', '') } }.freeze
Class Method Summary collapse
-
.brokers ⇒ Object
Returns the Kafka brokers list from settings.
-
.default_group ⇒ Object
Returns the default consumer group from settings.
-
.enabled? ⇒ Boolean
Returns true when the Kafka adapter is enabled and rdkafka is available.
-
.ensure_topic(topic, partitions: 1, replication_factor: 1, config: {}) ⇒ Object
Create a Kafka topic via the admin client.
-
.kafka_settings ⇒ Object
Returns the raw Kafka settings hash.
-
.publish(topic, payload, key: nil, headers: {}, partition: nil) ⇒ Hash
Publish a single message to a Kafka topic.
-
.replay(topic, from_beginning: true, from_offset: nil, from_timestamp: nil, replay_group: "legion-replay-#{SecureRandom.hex(4)}") {|message| ... } ⇒ Object
Replay a topic from a specific point.
-
.reset! ⇒ Object
Resets internal producer/consumer state (useful in tests).
-
.subscribe(topic, group: default_group, from_beginning: false, max_messages: nil) {|message| ... } ⇒ Object
Subscribe to a Kafka topic with consumer group semantics.
Class Method Details
.brokers ⇒ Object
Returns the Kafka brokers list from settings.
112 113 114 |
# File 'lib/legion/transport/kafka.rb', line 112 def brokers Array(kafka_settings[:brokers]).flatten.compact end |
.default_group ⇒ Object
Returns the default consumer group from settings.
117 118 119 |
# File 'lib/legion/transport/kafka.rb', line 117 def default_group kafka_settings[:consumer_group] || 'legion' end |
.enabled? ⇒ Boolean
Returns true when the Kafka adapter is enabled and rdkafka is available.
37 38 39 40 41 42 43 44 45 |
# File 'lib/legion/transport/kafka.rb', line 37 def enabled? return false unless kafka_settings[:enabled] require_rdkafka true rescue Legion::Transport::Kafka::UnavailableError => e handle_exception(e, level: :debug, handled: true, operation: 'transport.kafka.enabled') false end |
.ensure_topic(topic, partitions: 1, replication_factor: 1, config: {}) ⇒ Object
Create a Kafka topic via the admin client. Idempotent — returns true if the topic already exists.
104 105 106 107 108 109 |
# File 'lib/legion/transport/kafka.rb', line 104 def ensure_topic(topic, partitions: 1, replication_factor: 1, config: {}) require_enabled! Admin.ensure_topic(topic, partitions: partitions, replication_factor: replication_factor, config: config) end |
.kafka_settings ⇒ Object
Returns the raw Kafka settings hash.
122 123 124 125 126 127 |
# File 'lib/legion/transport/kafka.rb', line 122 def kafka_settings Legion::Settings[:transport][:kafka] rescue StandardError => e handle_exception(e, level: :debug, handled: true, operation: 'transport.kafka.settings') Legion::Transport::Kafka::DEFAULTS end |
.publish(topic, payload, key: nil, headers: {}, partition: nil) ⇒ Hash
Publish a single message to a Kafka topic.
55 56 57 58 |
# File 'lib/legion/transport/kafka.rb', line 55 def publish(topic, payload, key: nil, headers: {}, partition: nil) require_enabled! Producer.publish(topic, payload, key: key, headers: headers, partition: partition) end |
.replay(topic, from_beginning: true, from_offset: nil, from_timestamp: nil, replay_group: "legion-replay-#{SecureRandom.hex(4)}") {|message| ... } ⇒ Object
Replay a topic from a specific point. Creates an isolated consumer group for the replay session so that production offsets are not disturbed.
86 87 88 89 90 91 92 93 94 95 |
# File 'lib/legion/transport/kafka.rb', line 86 def replay(topic, from_beginning: true, from_offset: nil, from_timestamp: nil, replay_group: "legion-replay-#{SecureRandom.hex(4)}", &) require_enabled! Consumer.replay(topic, from_beginning: from_beginning, from_offset: from_offset, from_timestamp: , replay_group: replay_group, &) end |
.reset! ⇒ Object
Resets internal producer/consumer state (useful in tests).
130 131 132 133 |
# File 'lib/legion/transport/kafka.rb', line 130 def reset! Producer.reset! Consumer.reset! end |
.subscribe(topic, group: default_group, from_beginning: false, max_messages: nil) {|message| ... } ⇒ Object
Subscribe to a Kafka topic with consumer group semantics. Runs the block synchronously in the calling thread for each message. For background polling, wrap in a Thread / actor.
70 71 72 73 74 |
# File 'lib/legion/transport/kafka.rb', line 70 def subscribe(topic, group: default_group, from_beginning: false, max_messages: nil, &) require_enabled! Consumer.subscribe(topic, group: group, from_beginning: from_beginning, max_messages: , &) end |