Class: Tina4::QueueBackends::KafkaBackend
- Inherits:
-
Object
- Object
- Tina4::QueueBackends::KafkaBackend
- Defined in:
- lib/tina4/queue_backends/kafka_backend.rb
Class Method Summary collapse
-
._security_config ⇒ Object
Build SSL/SASL client config from env (for a TLS broker/proxy).
Instance Method Summary collapse
- #acknowledge(_message) ⇒ Object
- #close ⇒ Object
- #dead_letter(message) ⇒ Object
- #dequeue(topic) ⇒ Object
- #enqueue(message) ⇒ Object
-
#initialize(options = {}) ⇒ KafkaBackend
constructor
A new instance of KafkaBackend.
- #requeue(message) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ KafkaBackend
Returns a new instance of KafkaBackend.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 6 def initialize( = {}) require "rdkafka" @brokers = [:brokers] || "localhost:9092" @group_id = [:group_id] || "tina4_consumer_group" security = self.class._security_config producer_config = { "bootstrap.servers" => @brokers }.merge(security) @producer = Rdkafka::Config.new(producer_config).producer consumer_config = { "bootstrap.servers" => @brokers, "group.id" => @group_id, "auto.offset.reset" => "earliest", "enable.auto.commit" => "false" }.merge(security) @consumer = Rdkafka::Config.new(consumer_config).consumer @subscribed_topics = [] rescue LoadError raise "Kafka backend requires the 'rdkafka' gem. Install with: gem install rdkafka" end |
Class Method Details
._security_config ⇒ Object
Build SSL/SASL client config from env (for a TLS broker/proxy).
Mirrors tina4_python KafkaConnector._security_config: each setting is read from the Tina4-namespaced env var first (TINA4_KAFKA_<NAME>) and falls back to the bare librdkafka-convention name (KAFKA_<NAME>) that many Kafka deployments already set. Honours security.protocol (e.g. SSL, SASL_SSL), ssl.ca.location, and optional SASL (mechanism / username / password). Unset keys are omitted, leaving librdkafka’s PLAINTEXT default.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 38 def self._security_config # rdkafka key -> env suffix (read as TINA4_KAFKA_<suffix>, then KAFKA_<suffix>) mapping = { "security.protocol" => "SECURITY_PROTOCOL", "ssl.ca.location" => "SSL_CA_LOCATION", "sasl.mechanism" => "SASL_MECHANISM", "sasl.username" => "SASL_USERNAME", "sasl.password" => "SASL_PASSWORD" } config = {} mapping.each do |rdk, suffix| value = env_value("TINA4_KAFKA_#{suffix}") || env_value("KAFKA_#{suffix}") config[rdk] = value if value end config end |
Instance Method Details
#acknowledge(_message) ⇒ Object
94 95 96 |
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 94 def acknowledge() @consumer.commit if @last_message end |
#close ⇒ Object
111 112 113 114 |
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 111 def close @producer&.close @consumer&.close end |
#dead_letter(message) ⇒ Object
102 103 104 105 106 107 108 109 |
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 102 def dead_letter() dead_msg = Tina4::Job.new( topic: "#{.topic}.dead_letter", payload: .payload, id: .id ) enqueue(dead_msg) end |
#dequeue(topic) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 73 def dequeue(topic) unless @subscribed_topics.include?(topic) @consumer.subscribe(topic) @subscribed_topics << topic end msg = @consumer.poll(1000) return nil unless msg data = JSON.parse(msg.payload) @last_message = msg Tina4::Job.new( topic: data["topic"], payload: data["payload"], id: data["id"] ) rescue Rdkafka::RdkafkaError nil end |
#enqueue(message) ⇒ Object
65 66 67 68 69 70 71 |
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 65 def enqueue() @producer.produce( topic: .topic, payload: .to_json, key: .id ).wait end |
#requeue(message) ⇒ Object
98 99 100 |
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 98 def requeue() enqueue() end |