Class: Deimos::KafkaTopicInfo

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/deimos/kafka_topic_info.rb

Overview

Record that keeps track of which topics are being worked on by DbProducers.

Class Method Summary collapse

Class Method Details

.clear_lock(topic, lock_id) ⇒ void

This method returns an undefined value.

This is called once a producer is finished working on a topic, i.e. there are no more messages to fetch. It unlocks the topic and moves on to the next one.

Parameters:

  • topic (String)
  • lock_id (String)


54
55
56
57
58
59
60
61
# File 'lib/deimos/kafka_topic_info.rb', line 54

def clear_lock(topic, lock_id)
  self.where(topic: topic, locked_by: lock_id).
    update_all(locked_by: nil,
               locked_at: nil,
               error: false,
               retries: 0,
               last_processed_at: Time.zone.now)
end

.heartbeat(topic, lock_id) ⇒ void

This method returns an undefined value.

Update the locked_at timestamp to indicate that the producer is still working on those messages and to continue.

Parameters:

  • topic (String)
  • lock_id (String)


100
101
102
103
# File 'lib/deimos/kafka_topic_info.rb', line 100

def heartbeat(topic, lock_id)
  self.where(topic: topic, locked_by: lock_id).
    update_all(locked_at: Time.zone.now)
end

.lock(topic, lock_id) ⇒ Boolean

Lock a topic for the given ID. Returns whether the lock was successful.

Parameters:

  • topic (String)
  • lock_id (String)

Returns:

  • (Boolean)


13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/deimos/kafka_topic_info.rb', line 13

def lock(topic, lock_id)
  # Try to create it - it's fine if it already exists
  begin
    self.create(topic: topic, last_processed_at: Time.zone.now)
  rescue ActiveRecord::RecordNotUnique
    # continue on
  end

  # Lock the record
  qtopic = self.connection.quote(topic)
  qlock_id = self.connection.quote(lock_id)
  qtable = self.connection.quote_table_name('kafka_topic_info')
  qnow = self.connection.quote(Time.zone.now.to_s(:db))
  qfalse = self.connection.quoted_false
  qtime = self.connection.quote(1.minute.ago.to_s(:db))

  # If a record is marked as error and less than 1 minute old,
  # we don't want to pick it up even if not currently locked because
  # we worry we'll run into the same problem again.
  # Once it's more than 1 minute old, we figure it's OK to try again
  # so we can pick up any topic that's that old, even if it was
  # locked by someone, because it's the job of the producer to keep
  # updating the locked_at timestamp as they work on messages in that
  # topic. If the locked_at timestamp is that old, chances are that
  # the producer crashed.
  sql = <<~SQL
    UPDATE #{qtable}
    SET locked_by=#{qlock_id}, locked_at=#{qnow}, error=#{qfalse}
    WHERE topic=#{qtopic} AND
     ((locked_by IS NULL AND error=#{qfalse}) OR locked_at < #{qtime})
  SQL
  self.connection.update(sql)
  self.where(locked_by: lock_id, topic: topic).any?
end

.ping_empty_topics(except_topics) ⇒ void

This method returns an undefined value.

Update all topics that aren't currently locked and have no messages waiting. It's OK if some messages get inserted in the middle of this because the point is that at least within a few milliseconds of each other, it wasn't locked and had no messages, meaning the topic was in a good state. realized had messages in them, meaning all other topics were empty.

Parameters:

  • except_topics (Array<String>)

    the list of topics we've just



71
72
73
74
75
76
77
# File 'lib/deimos/kafka_topic_info.rb', line 71

def ping_empty_topics(except_topics)
  records = KafkaTopicInfo.where(locked_by: nil).
    where('topic not in(?)', except_topics)
  records.each do |info|
    info.update_attribute(:last_processed_at, Time.zone.now)
  end
end

.register_error(topic, lock_id) ⇒ void

This method returns an undefined value.

The producer calls this if it gets an error sending messages. This essentially locks down this topic for 1 minute (for all producers) and allows the caller to continue to the next topic.

Parameters:

  • topic (String)
  • lock_id (String)


85
86
87
88
89
90
91
92
93
# File 'lib/deimos/kafka_topic_info.rb', line 85

def register_error(topic, lock_id)
  record = self.where(topic: topic, locked_by: lock_id).last
  attr_hash = { locked_by: nil,
                locked_at: Time.zone.now,
                error: true,
                retries: record.retries + 1 }
  record.attributes = attr_hash
  record.save!
end