Class: Deimos::KafkaTopicInfo
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Deimos::KafkaTopicInfo
- Defined in:
- lib/deimos/kafka_topic_info.rb,
sig/defs.rbs
Overview
Record that keeps track of which topics are being worked on by DbProducers.
Class Method Summary collapse
-
.clear_lock(topic, lock_id) ⇒ void
This is called once a producer is finished working on a topic, i.e.
-
.heartbeat(topic, lock_id) ⇒ void
Update the locked_at timestamp to indicate that the producer is still working on those messages and to continue.
-
.lock(topic, lock_id) ⇒ Boolean
Lock a topic for the given ID.
-
.ping_empty_topics(except_topics) ⇒ void
Update all topics that aren't currently locked and have no messages waiting.
- .quote_time(time) ⇒ Object
-
.register_error(topic, lock_id) ⇒ void
The producer calls this if it gets an error sending messages.
Class Method Details
.clear_lock(topic, lock_id) ⇒ void
This method returns an undefined value.
This is called once a producer is finished working on a topic, i.e. there are no more messages to fetch. It unlocks the topic and moves on to the next one.
@param topic
@param lock_id
59 60 61 62 63 64 65 66 |
# File 'lib/deimos/kafka_topic_info.rb', line 59 def clear_lock(topic, lock_id) self.where(topic: topic, locked_by: lock_id). update_all(locked_by: nil, locked_at: nil, error: false, retries: 0, last_processed_at: Time.zone.now) end |
.heartbeat(topic, lock_id) ⇒ void
This method returns an undefined value.
Update the locked_at timestamp to indicate that the producer is still working on those messages and to continue.
@param topic
@param lock_id
105 106 107 108 |
# File 'lib/deimos/kafka_topic_info.rb', line 105 def heartbeat(topic, lock_id) self.where(topic: topic, locked_by: lock_id). update_all(locked_at: Time.zone.now) end |
.lock(topic, lock_id) ⇒ Boolean
Lock a topic for the given ID. Returns whether the lock was successful.
@param topic
@param lock_id
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/deimos/kafka_topic_info.rb', line 18 def lock(topic, lock_id) # rubocop:disable Naming/PredicateMethod # Try to create it - it's fine if it already exists begin self.create!(topic: topic, last_processed_at: Time.zone.now) rescue ActiveRecord::RecordNotUnique # continue on end # Lock the record qtopic = self.connection.quote(topic) qlock_id = self.connection.quote(lock_id) qtable = self.connection.quote_table_name(self.table_name) qnow = self.connection.quote(quote_time(Time.zone.now)) qfalse = self.connection.quoted_false qtime = self.connection.quote(quote_time(1.minute.ago)) # If a record is marked as error and less than 1 minute old, # we don't want to pick it up even if not currently locked because # we worry we'll run into the same problem again. # Once it's more than 1 minute old, we figure it's OK to try again # so we can pick up any topic that's that old, even if it was # locked by someone, because it's the job of the producer to keep # updating the locked_at timestamp as they work on messages in that # topic. If the locked_at timestamp is that old, chances are that # the producer crashed. sql = <<~SQL UPDATE #{qtable} SET locked_by=#{qlock_id}, locked_at=#{qnow}, error=#{qfalse} WHERE topic=#{qtopic} AND ((locked_by IS NULL AND error=#{qfalse}) OR locked_at < #{qtime}) SQL self.connection.update(sql) self.where(locked_by: lock_id, topic: topic).any? end |
.ping_empty_topics(except_topics) ⇒ void
This method returns an undefined value.
Update all topics that aren't currently locked and have no messages waiting. It's OK if some messages get inserted in the middle of this because the point is that at least within a few milliseconds of each other, it wasn't locked and had no messages, meaning the topic was in a good state. realized had messages in them, meaning all other topics were empty.
@param except_topics — the list of topics we've just
76 77 78 79 80 81 82 |
# File 'lib/deimos/kafka_topic_info.rb', line 76 def ping_empty_topics(except_topics) records = KafkaTopicInfo.where(locked_by: nil). where('topic not in(?)', except_topics) records.each do |info| info.update_attribute(:last_processed_at, Time.zone.now) end end |
.quote_time(time) ⇒ Object
10 11 12 |
# File 'lib/deimos/kafka_topic_info.rb', line 10 def quote_time(time) time.respond_to?(:to_fs) ? time.to_fs(:db) : time.to_s(:db) end |
.register_error(topic, lock_id) ⇒ void
This method returns an undefined value.
The producer calls this if it gets an error sending messages. This essentially locks down this topic for 1 minute (for all producers) and allows the caller to continue to the next topic.
@param topic
@param lock_id
90 91 92 93 94 95 96 97 98 |
# File 'lib/deimos/kafka_topic_info.rb', line 90 def register_error(topic, lock_id) record = self.where(topic: topic, locked_by: lock_id).last attr_hash = { locked_by: nil, locked_at: Time.zone.now, error: true, retries: record.retries + 1 } record.attributes = attr_hash record.save! end |