Class: Tina4::QueueBackends::KafkaBackend

Inherits:
Object
  • Object
show all
Defined in:
lib/tina4/queue_backends/kafka_backend.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ KafkaBackend

Returns a new instance of KafkaBackend.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 6

def initialize(options = {})
  require "rdkafka"
  @brokers = options[:brokers] || "localhost:9092"
  @group_id = options[:group_id] || "tina4_consumer_group"

  producer_config = {
    "bootstrap.servers" => @brokers
  }
  @producer = Rdkafka::Config.new(producer_config).producer

  consumer_config = {
    "bootstrap.servers" => @brokers,
    "group.id" => @group_id,
    "auto.offset.reset" => "earliest",
    "enable.auto.commit" => "false"
  }
  @consumer = Rdkafka::Config.new(consumer_config).consumer
  @subscribed_topics = []
rescue LoadError
  raise "Kafka backend requires the 'rdkafka' gem. Install with: gem install rdkafka"
end

Instance Method Details

#acknowledge(_message) ⇒ Object



57
58
59
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 57

def acknowledge(_message)
  @consumer.commit if @last_message
end

#closeObject



74
75
76
77
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 74

def close
  @producer&.close
  @consumer&.close
end

#dead_letter(message) ⇒ Object



65
66
67
68
69
70
71
72
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 65

def dead_letter(message)
  dead_msg = Tina4::Job.new(
    topic: "#{message.topic}.dead_letter",
    payload: message.payload,
    id: message.id
  )
  enqueue(dead_msg)
end

#dequeue(topic) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 36

def dequeue(topic)
  unless @subscribed_topics.include?(topic)
    @consumer.subscribe(topic)
    @subscribed_topics << topic
  end

  msg = @consumer.poll(1000)
  return nil unless msg

  data = JSON.parse(msg.payload)
  @last_message = msg

  Tina4::Job.new(
    topic: data["topic"],
    payload: data["payload"],
    id: data["id"]
  )
rescue Rdkafka::RdkafkaError
  nil
end

#enqueue(message) ⇒ Object



28
29
30
31
32
33
34
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 28

def enqueue(message)
  @producer.produce(
    topic: message.topic,
    payload: message.to_json,
    key: message.id
  ).wait
end

#requeue(message) ⇒ Object



61
62
63
# File 'lib/tina4/queue_backends/kafka_backend.rb', line 61

def requeue(message)
  enqueue(message)
end