Class: Deimos::Utils::DbProducer

Inherits:
Object
  • Object
show all
Includes:
Phobos::Producer
Defined in:
lib/deimos/utils/db_producer.rb

Overview

Class which continually polls the kafka_messages table in the database and sends Kafka messages.

Constant Summary collapse

BATCH_SIZE =
1000
DELETE_BATCH_SIZE =
10
MAX_DELETE_ATTEMPTS =
3

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger = Logger.new(STDOUT)) ⇒ DbProducer

Returns a new instance of DbProducer.

Parameters:

  • logger (Logger) (defaults to: Logger.new(STDOUT))


16
17
18
19
20
# File 'lib/deimos/utils/db_producer.rb', line 16

def initialize(logger=Logger.new(STDOUT))
  @id = SecureRandom.uuid
  @logger = logger
  @logger.push_tags("DbProducer #{@id}") if @logger.respond_to?(:push_tags)
end

Instance Attribute Details

#current_topicObject

Returns the value of attribute current_topic.



9
10
11
# File 'lib/deimos/utils/db_producer.rb', line 9

def current_topic
  @current_topic
end

#idObject

Returns the value of attribute id.



9
10
11
# File 'lib/deimos/utils/db_producer.rb', line 9

def id
  @id
end

Instance Method Details

#compact_messages(batch) ⇒ Array<Deimos::KafkaMessage>

Parameters:

Returns:



232
233
234
235
236
237
238
239
240
# File 'lib/deimos/utils/db_producer.rb', line 232

def compact_messages(batch)
  return batch if batch.first&.key.blank?

  topic = batch.first.topic
  return batch if config.compact_topics != :all &&
                  !config.compact_topics.include?(topic)

  batch.reverse.uniq(&:key).reverse!
end

#configDeimos::DbProducerConfig

Returns:

  • (Deimos::DbProducerConfig)


23
24
25
# File 'lib/deimos/utils/db_producer.rb', line 23

def config
  Deimos.config.db_producer
end

#delete_messages(messages) ⇒ Object

Parameters:



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/deimos/utils/db_producer.rb', line 114

def delete_messages(messages)
  attempts = 1
  begin
    messages.in_groups_of(DELETE_BATCH_SIZE, false).each do |batch|
      Deimos::KafkaMessage.where(topic: batch.first.topic,
                                 id: batch.map(&:id)).
        delete_all
    end
  rescue StandardError => e
    if (e.message =~ /Lock wait/i || e.message =~ /Lost connection/i) &&
       attempts <= MAX_DELETE_ATTEMPTS
      attempts += 1
      ActiveRecord::Base.connection.verify!
      sleep(1)
      retry
    end
    raise
  end
end

#log_messages(messages) ⇒ Object

Parameters:



140
141
142
143
144
145
146
147
# File 'lib/deimos/utils/db_producer.rb', line 140

def log_messages(messages)
  return if config.log_topics != :all && !config.log_topics.include?(@current_topic)

  @logger.debug do
    decoded_messages = Deimos::KafkaMessage.decoded(messages)
    "DB producer: Topic #{@current_topic} Producing messages: #{decoded_messages}}"
  end
end

#process_next_messagesObject

Complete one loop of processing all messages in the DB.



49
50
51
52
53
54
55
# File 'lib/deimos/utils/db_producer.rb', line 49

def process_next_messages
  topics = retrieve_topics
  @logger.info("Found topics: #{topics}")
  topics.each(&method(:process_topic))
  KafkaTopicInfo.ping_empty_topics(topics)
  sleep(0.5)
end

#process_topic(topic) ⇒ String?

Returns the topic that was locked, or nil if none were.

Parameters:

  • topic (String)

Returns:

  • (String, nil)

    the topic that was locked, or nil if none were.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/deimos/utils/db_producer.rb', line 64

def process_topic(topic)
  # If the topic is already locked, another producer is currently
  # working on it. Move on to the next one.
  unless KafkaTopicInfo.lock(topic, @id)
    @logger.debug("Could not lock topic #{topic} - continuing")
    return
  end
  @current_topic = topic

  loop { break unless process_topic_batch }

  KafkaTopicInfo.clear_lock(@current_topic, @id)
rescue StandardError => e
  @logger.error("Error processing messages for topic #{@current_topic}: #{e.class.name}: #{e.message} #{e.backtrace.join("\n")}")
  KafkaTopicInfo.register_error(@current_topic, @id)
  shutdown_producer
end

#process_topic_batchObject

Process a single batch in a topic.



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/deimos/utils/db_producer.rb', line 83

def process_topic_batch
  messages = retrieve_messages
  return false if messages.empty?

  batch_size = messages.size
  compacted_messages = compact_messages(messages)
  log_messages(compacted_messages)
  Deimos.instrument('db_producer.produce', topic: @current_topic, messages: compacted_messages) do
    begin
      produce_messages(compacted_messages.map(&:phobos_message))
    rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge
      delete_messages(messages)
      @logger.error('Message batch too large, deleting...')
      @logger.error(Deimos::KafkaMessage.decoded(messages))
      raise
    end
  end
  delete_messages(messages)
  Deimos.config.metrics&.increment(
    'db_producer.process',
    tags: %W(topic:#{@current_topic}),
    by: messages.size
  )
  return false if batch_size < BATCH_SIZE

  KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive
  send_pending_metrics
  true
end

#produce_messages(batch) ⇒ Object

Produce messages in batches, reducing the size 1/10 if the batch is too large. Does not retry batches of messages that have already been sent.

Parameters:

  • batch (Array<Hash>)


197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/deimos/utils/db_producer.rb', line 197

def produce_messages(batch)
  batch_size = batch.size
  current_index = 0
  begin
    batch[current_index..-1].in_groups_of(batch_size, false).each do |group|
      @logger.debug("Publishing #{group.size} messages to #{@current_topic}")
      producer.publish_list(group)
      Deimos.config.metrics&.increment(
        'publish',
        tags: %W(status:success topic:#{@current_topic}),
        by: group.size
      )
      current_index += group.size
      @logger.info("Sent #{group.size} messages to #{@current_topic}")
    end
  rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge,
         Kafka::RecordListTooLarge => e
    if batch_size == 1
      shutdown_producer
      raise
    end

    @logger.error("Got error #{e.class.name} when publishing #{batch.size} in groups of #{batch_size}, retrying...")
    batch_size = if batch_size < 10
                   1
                 else
                   (batch_size / 10).to_i
                 end
    shutdown_producer
    retry
  end
end

#retrieve_messagesArray<Deimos::KafkaMessage>

Returns:



135
136
137
# File 'lib/deimos/utils/db_producer.rb', line 135

def retrieve_messages
  KafkaMessage.where(topic: @current_topic).order(:id).limit(BATCH_SIZE)
end

#retrieve_topicsArray<String>

Returns:

  • (Array<String>)


58
59
60
# File 'lib/deimos/utils/db_producer.rb', line 58

def retrieve_topics
  KafkaMessage.select('distinct topic').map(&:topic).uniq
end

#send_pending_metricsObject

Send metrics to Datadog.



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/deimos/utils/db_producer.rb', line 150

def send_pending_metrics
  metrics = Deimos.config.metrics
  return unless metrics

  topics = KafkaTopicInfo.select(%w(topic last_processed_at))
  messages = Deimos::KafkaMessage.
    select('count(*) as num_messages, min(created_at) as earliest, topic').
    group(:topic).
    index_by(&:topic)
  topics.each do |record|
    message_record = messages[record.topic]
    # We want to record the last time we saw any activity, meaning either
    # the oldest message, or the last time we processed, whichever comes
    # last.
    if message_record
      record_earliest = message_record.earliest
      # SQLite gives a string here
      if record_earliest.is_a?(String)
        record_earliest = Time.zone.parse(record_earliest)
      end

      earliest = [record.last_processed_at, record_earliest].max
      time_diff = Time.zone.now - earliest
      metrics.gauge('pending_db_messages_max_wait', time_diff,
                    tags: ["topic:#{record.topic}"])
    else
      # no messages waiting
      metrics.gauge('pending_db_messages_max_wait', 0,
                    tags: ["topic:#{record.topic}"])
    end
    metrics.gauge('pending_db_messages_count', message_record&.num_messages || 0,
                  tags: ["topic:#{record.topic}"])
  end
end

#shutdown_producerObject

Shut down the sync producer if we have to. Phobos will automatically create a new one. We should call this if the producer can be in a bad state and e.g. we need to clear the buffer.



188
189
190
191
192
# File 'lib/deimos/utils/db_producer.rb', line 188

def shutdown_producer
  if self.class.producer.respond_to?(:sync_producer_shutdown) # Phobos 1.8.3
    self.class.producer.sync_producer_shutdown
  end
end

#startObject

Start the poll.



28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/deimos/utils/db_producer.rb', line 28

def start
  @logger.info('Starting...')
  @signal_to_stop = false
  ActiveRecord::Base.connection.reconnect!
  loop do
    if @signal_to_stop
      @logger.info('Shutting down')
      break
    end
    send_pending_metrics
    process_next_messages
  end
end

#stopObject

Stop the poll.



43
44
45
46
# File 'lib/deimos/utils/db_producer.rb', line 43

def stop
  @logger.info('Received signal to stop')
  @signal_to_stop = true
end