Class: Tina4::QueueBackends::KafkaBackend
- Inherits:
-
Object
- Object
- Tina4::QueueBackends::KafkaBackend
- Defined in:
- lib/tina4/queue_backends/kafka_backend.rb
Class Method Summary collapse
-
._security_config ⇒ Object
Build SSL/SASL client config from env (for a TLS broker/proxy).
Instance Method Summary collapse
- #acknowledge(_message) ⇒ Object
- #close ⇒ Object
- #dead_letter(message) ⇒ Object
- #dequeue(topic) ⇒ Object
- #enqueue(message) ⇒ Object
-
#initialize(options = {}) ⇒ KafkaBackend
constructor
A new instance of KafkaBackend.
- #requeue(message) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ KafkaBackend
Returns a new instance of KafkaBackend.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 6 def initialize( = {}) require "rdkafka" @brokers = [:brokers] || "localhost:9092" @group_id = [:group_id] || "tina4_consumer_group" security = self.class._security_config producer_config = { "bootstrap.servers" => @brokers }.merge(security) @producer = Rdkafka::Config.new(producer_config).producer consumer_config = { "bootstrap.servers" => @brokers, "group.id" => @group_id, "auto.offset.reset" => "earliest", "enable.auto.commit" => "false" }.merge(security) @consumer = Rdkafka::Config.new(consumer_config).consumer @subscribed_topics = [] rescue LoadError raise "Kafka backend requires the 'rdkafka' gem. Install with: gem install rdkafka" end |
Class Method Details
._security_config ⇒ Object
Build SSL/SASL client config from env (for a TLS broker/proxy).
Mirrors tina4_python KafkaConnector._security_config: each setting is read from the Tina4-namespaced env var first (TINA4_KAFKA_<NAME>) and falls back to the bare librdkafka-convention name (KAFKA_<NAME>) that many Kafka deployments already set. Honours security.protocol (e.g. SSL, SASL_SSL), ssl.ca.location, and optional SASL (mechanism / username / password). Unset keys are omitted, leaving librdkafka’s PLAINTEXT default.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 38 def self._security_config # rdkafka key -> env suffix (read as TINA4_KAFKA_<suffix>, then KAFKA_<suffix>) mapping = { "security.protocol" => "SECURITY_PROTOCOL", "ssl.ca.location" => "SSL_CA_LOCATION", "sasl.mechanism" => "SASL_MECHANISM", "sasl.username" => "SASL_USERNAME", "sasl.password" => "SASL_PASSWORD" } config = {} mapping.each do |rdk, suffix| value = env_value("TINA4_KAFKA_#{suffix}") || env_value("KAFKA_#{suffix}") config[rdk] = value if value end config end |
Instance Method Details
#acknowledge(_message) ⇒ Object
111 112 113 |
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 111 def acknowledge() @consumer.commit if @last_message end |
#close ⇒ Object
128 129 130 131 |
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 128 def close @producer&.close @consumer&.close end |
#dead_letter(message) ⇒ Object
119 120 121 122 123 124 125 126 |
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 119 def dead_letter() dead_msg = Tina4::Job.new( topic: "#{.topic}.dead_letter", payload: .payload, id: .id ) enqueue(dead_msg) end |
#dequeue(topic) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 73 def dequeue(topic) first = !@subscribed_topics.include?(topic) if first @consumer.subscribe(topic) @subscribed_topics << topic end # The first poll after subscribing must drive the consumer-group join + # partition assignment, which takes several seconds on a cold broker. # Until partitions are assigned, poll returns nil even when the topic # already has messages -- so a single poll made dequeue return nil right # after enqueue. Poll in a bounded loop on first subscribe (deadline # TINA4_KAFKA_ASSIGN_TIMEOUT, default 15s); steady state stays one ~1s poll. deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + (first ? (ENV["TINA4_KAFKA_ASSIGN_TIMEOUT"] || "15").to_f : 1.0) msg = nil loop do candidate = @consumer.poll(500) if candidate msg = candidate break end break if Process.clock_gettime(Process::CLOCK_MONOTONIC) >= deadline end return nil unless msg data = JSON.parse(msg.payload) @last_message = msg Tina4::Job.new( topic: data["topic"], payload: data["payload"], id: data["id"] ) rescue Rdkafka::RdkafkaError nil end |
#enqueue(message) ⇒ Object
65 66 67 68 69 70 71 |
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 65 def enqueue() @producer.produce( topic: .topic, payload: .to_json, key: .id ).wait end |
#requeue(message) ⇒ Object
115 116 117 |
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 115 def requeue() enqueue() end |