Class: Tina4::QueueBackends::KafkaBackend

Inherits:
Object
  • Object
show all
Defined in:
lib/tina4/queue_backends/kafka_backend.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ KafkaBackend

Returns a new instance of KafkaBackend.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 6

def initialize(options = {})
  require "rdkafka"
  @brokers = options[:brokers] || "localhost:9092"
  @group_id = options[:group_id] || "tina4_consumer_group"

  security = self.class._security_config

  producer_config = {
    "bootstrap.servers" => @brokers
  }.merge(security)
  @producer = Rdkafka::Config.new(producer_config).producer

  consumer_config = {
    "bootstrap.servers" => @brokers,
    "group.id" => @group_id,
    "auto.offset.reset" => "earliest",
    "enable.auto.commit" => "false"
  }.merge(security)
  @consumer = Rdkafka::Config.new(consumer_config).consumer
  @subscribed_topics = []
rescue LoadError
  raise "Kafka backend requires the 'rdkafka' gem. Install with: gem install rdkafka"
end

Class Method Details

._security_configObject

Build SSL/SASL client config from env (for a TLS broker/proxy).

Mirrors tina4_python KafkaConnector._security_config: each setting is read from the Tina4-namespaced env var first (TINA4_KAFKA_<NAME>) and falls back to the bare librdkafka-convention name (KAFKA_<NAME>) that many Kafka deployments already set. Honours security.protocol (e.g. SSL, SASL_SSL), ssl.ca.location, and optional SASL (mechanism / username / password). Unset keys are omitted, leaving librdkafka’s PLAINTEXT default.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 38

def self._security_config
  # rdkafka key -> env suffix (read as TINA4_KAFKA_<suffix>, then KAFKA_<suffix>)
  mapping = {
    "security.protocol" => "SECURITY_PROTOCOL",
    "ssl.ca.location" => "SSL_CA_LOCATION",
    "sasl.mechanism" => "SASL_MECHANISM",
    "sasl.username" => "SASL_USERNAME",
    "sasl.password" => "SASL_PASSWORD"
  }
  config = {}
  mapping.each do |rdk, suffix|
    value = env_value("TINA4_KAFKA_#{suffix}") || env_value("KAFKA_#{suffix}")
    config[rdk] = value if value
  end
  config
end

Instance Method Details

#acknowledge(_message) ⇒ Object



111
112
113
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 111

def acknowledge(_message)
  @consumer.commit if @last_message
end

#closeObject



128
129
130
131
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 128

def close
  @producer&.close
  @consumer&.close
end

#dead_letter(message) ⇒ Object



119
120
121
122
123
124
125
126
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 119

def dead_letter(message)
  dead_msg = Tina4::Job.new(
    topic: "#{message.topic}.dead_letter",
    payload: message.payload,
    id: message.id
  )
  enqueue(dead_msg)
end

#dequeue(topic) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 73

def dequeue(topic)
  first = !@subscribed_topics.include?(topic)
  if first
    @consumer.subscribe(topic)
    @subscribed_topics << topic
  end

  # The first poll after subscribing must drive the consumer-group join +
  # partition assignment, which takes several seconds on a cold broker.
  # Until partitions are assigned, poll returns nil even when the topic
  # already has messages -- so a single poll made dequeue return nil right
  # after enqueue. Poll in a bounded loop on first subscribe (deadline
  # TINA4_KAFKA_ASSIGN_TIMEOUT, default 15s); steady state stays one ~1s poll.
  deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) +
             (first ? (ENV["TINA4_KAFKA_ASSIGN_TIMEOUT"] || "15").to_f : 1.0)
  msg = nil
  loop do
    candidate = @consumer.poll(500)
    if candidate
      msg = candidate
      break
    end
    break if Process.clock_gettime(Process::CLOCK_MONOTONIC) >= deadline
  end
  return nil unless msg

  data = JSON.parse(msg.payload)
  @last_message = msg

  Tina4::Job.new(
    topic: data["topic"],
    payload: data["payload"],
    id: data["id"]
  )
rescue Rdkafka::RdkafkaError
  nil
end

#enqueue(message) ⇒ Object



65
66
67
68
69
70
71
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 65

def enqueue(message)
  @producer.produce(
    topic: message.topic,
    payload: message.to_json,
    key: message.id
  ).wait
end

#requeue(message) ⇒ Object



115
116
117
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 115

def requeue(message)
  enqueue(message)
end