Class: Tina4::QueueBackends::KafkaBackend

Inherits:
Object
  • Object
show all
Defined in:
lib/tina4/queue_backends/kafka_backend.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ KafkaBackend

Returns a new instance of KafkaBackend.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 6

def initialize(options = {})
  require "rdkafka"
  @brokers = options[:brokers] || "localhost:9092"
  @group_id = options[:group_id] || "tina4_consumer_group"

  security = self.class._security_config

  producer_config = {
    "bootstrap.servers" => @brokers
  }.merge(security)
  @producer = Rdkafka::Config.new(producer_config).producer

  consumer_config = {
    "bootstrap.servers" => @brokers,
    "group.id" => @group_id,
    "auto.offset.reset" => "earliest",
    "enable.auto.commit" => "false"
  }.merge(security)
  @consumer = Rdkafka::Config.new(consumer_config).consumer
  @subscribed_topics = []
rescue LoadError
  raise "Kafka backend requires the 'rdkafka' gem. Install with: gem install rdkafka"
end

Class Method Details

._security_configObject

Build SSL/SASL client config from env (for a TLS broker/proxy).

Mirrors tina4_python KafkaConnector._security_config: each setting is read from the Tina4-namespaced env var first (TINA4_KAFKA_<NAME>) and falls back to the bare librdkafka-convention name (KAFKA_<NAME>) that many Kafka deployments already set. Honours security.protocol (e.g. SSL, SASL_SSL), ssl.ca.location, and optional SASL (mechanism / username / password). Unset keys are omitted, leaving librdkafka’s PLAINTEXT default.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 38

def self._security_config
  # rdkafka key -> env suffix (read as TINA4_KAFKA_<suffix>, then KAFKA_<suffix>)
  mapping = {
    "security.protocol" => "SECURITY_PROTOCOL",
    "ssl.ca.location" => "SSL_CA_LOCATION",
    "sasl.mechanism" => "SASL_MECHANISM",
    "sasl.username" => "SASL_USERNAME",
    "sasl.password" => "SASL_PASSWORD"
  }
  config = {}
  mapping.each do |rdk, suffix|
    value = env_value("TINA4_KAFKA_#{suffix}") || env_value("KAFKA_#{suffix}")
    config[rdk] = value if value
  end
  config
end

Instance Method Details

#acknowledge(_message) ⇒ Object



94
95
96
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 94

def acknowledge(_message)
  @consumer.commit if @last_message
end

#closeObject



111
112
113
114
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 111

def close
  @producer&.close
  @consumer&.close
end

#dead_letter(message) ⇒ Object



102
103
104
105
106
107
108
109
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 102

def dead_letter(message)
  dead_msg = Tina4::Job.new(
    topic: "#{message.topic}.dead_letter",
    payload: message.payload,
    id: message.id
  )
  enqueue(dead_msg)
end

#dequeue(topic) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 73

def dequeue(topic)
  unless @subscribed_topics.include?(topic)
    @consumer.subscribe(topic)
    @subscribed_topics << topic
  end

  msg = @consumer.poll(1000)
  return nil unless msg

  data = JSON.parse(msg.payload)
  @last_message = msg

  Tina4::Job.new(
    topic: data["topic"],
    payload: data["payload"],
    id: data["id"]
  )
rescue Rdkafka::RdkafkaError
  nil
end

#enqueue(message) ⇒ Object



65
66
67
68
69
70
71
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 65

def enqueue(message)
  @producer.produce(
    topic: message.topic,
    payload: message.to_json,
    key: message.id
  ).wait
end

#requeue(message) ⇒ Object



98
99
100
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 98

def requeue(message)
  enqueue(message)
end