Class: Deimos::Utils::OutboxProducer

Inherits:
Object
  • Object
show all
Defined in:
lib/deimos/utils/outbox_producer.rb

Overview

Class which continually polls the kafka_messages table in the database and sends Kafka messages.

Constant Summary collapse

BATCH_SIZE =

Returns:

  • (Integer)
1000
DELETE_BATCH_SIZE =

Returns:

  • (Integer)
10
MAX_DELETE_ATTEMPTS =

Returns:

  • (Integer)
3
FATAL_CODES =

Returns:

  • (Array<Symbol>)
%i(invalid_msg_size msg_size_too_large)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger = Logger.new(STDOUT)) ⇒ OutboxProducer

Returns a new instance of OutboxProducer.

Parameters:

  • logger (Logger) (defaults to: Logger.new(STDOUT))


20
21
22
23
24
# File 'lib/deimos/utils/outbox_producer.rb', line 20

def initialize(logger=Logger.new(STDOUT))
  @id = SecureRandom.uuid
  @logger = logger
  @logger.push_tags("OutboxProducer #{@id}") if @logger.respond_to?(:push_tags)
end

Instance Attribute Details

#current_topicObject

Returns the value of attribute current_topic.



8
9
10
# File 'lib/deimos/utils/outbox_producer.rb', line 8

def current_topic
  @current_topic
end

#idObject

Returns the value of attribute id.



8
9
10
# File 'lib/deimos/utils/outbox_producer.rb', line 8

def id
  @id
end

Instance Method Details

#compact_messages(batch) ⇒ Array<Deimos::KafkaMessage>

Parameters:

Returns:



218
219
220
221
222
223
224
225
226
# File 'lib/deimos/utils/outbox_producer.rb', line 218

def compact_messages(batch)
  return batch if batch.first&.key.blank?

  topic = batch.first.topic
  return batch if config.compact_topics != :all &&
                  !config.compact_topics.include?(topic)

  batch.reverse.uniq(&:key).reverse!
end

#configFigTree

Returns:

  • (FigTree)


27
28
29
# File 'lib/deimos/utils/outbox_producer.rb', line 27

def config
  Deimos.config.outbox
end

#delete_messages(messages) ⇒ void

This method returns an undefined value.

Parameters:



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/deimos/utils/outbox_producer.rb', line 126

def delete_messages(messages)
  attempts = 1
  begin
    messages.in_groups_of(DELETE_BATCH_SIZE, false).each do |batch|
      Deimos::KafkaMessage.where(topic: batch.first.topic,
                                 id: batch.map(&:id)).
        delete_all
    end
  rescue StandardError => e
    if (e.message =~ /Lock wait/i || e.message =~ /Lost connection/i) &&
       attempts <= MAX_DELETE_ATTEMPTS
      attempts += 1
      ActiveRecord::Base.connection.verify!
      sleep(1)
      retry
    end
    raise
  end
end

#log_messages(messages) ⇒ void

This method returns an undefined value.

Parameters:



153
154
155
156
157
158
159
160
# File 'lib/deimos/utils/outbox_producer.rb', line 153

def log_messages(messages)
  return if config.log_topics != :all && !config.log_topics.include?(@current_topic)

  @logger.debug do
    decoded_messages = Deimos::KafkaMessage.decoded(messages)
    "DB producer: Topic #{@current_topic} Producing messages: #{decoded_messages}}"
  end
end

#process_next_messagesvoid

This method returns an undefined value.

Complete one loop of processing all messages in the DB.



56
57
58
59
60
61
62
# File 'lib/deimos/utils/outbox_producer.rb', line 56

def process_next_messages
  topics = retrieve_topics
  @logger.info("Found topics: #{topics}")
  topics.each(&method(:process_topic))
  KafkaTopicInfo.ping_empty_topics(topics)
  sleep(0.5)
end

#process_topic(topic) ⇒ String?

Returns the topic that was locked, or nil if none were.

Parameters:

  • topic (String)

Returns:

  • (String, nil)

    the topic that was locked, or nil if none were.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/deimos/utils/outbox_producer.rb', line 71

def process_topic(topic)
  # If the topic is already locked, another producer is currently
  # working on it. Move on to the next one.
  unless KafkaTopicInfo.lock(topic, @id)
    @logger.debug("Could not lock topic #{topic} - continuing")
    return
  end
  @current_topic = topic

  loop { break unless process_topic_batch }

  KafkaTopicInfo.clear_lock(@current_topic, @id)
rescue StandardError => e
  @logger.error("Error processing messages for topic #{@current_topic}: #{e.class.name}: #{e.message} #{e.backtrace.join("\n")}")
  KafkaTopicInfo.register_error(@current_topic, @id)
end

#process_topic_batchvoid

This method returns an undefined value.

Process a single batch in a topic.



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/deimos/utils/outbox_producer.rb', line 90

def process_topic_batch
  messages = retrieve_messages
  return false if messages.empty?

  batch_size = messages.size
  compacted_messages = compact_messages(messages)
  log_messages(compacted_messages)
  Karafka.monitor.instrument('deimos.outbox.produce', topic: @current_topic, messages: compacted_messages) do
    begin
      produce_messages(compacted_messages.map(&:karafka_message))
    rescue WaterDrop::Errors::ProduceManyError => e
      if FATAL_CODES.include?(e.cause.try(:code))
        @logger.error('Message batch too large, deleting...')
        delete_messages(messages)
        raise e
      else
        Deimos.log_error("Got error #{e.cause.class.name} when publishing #{batch_size} messages, retrying...")
        retry
      end
    end
  end
  delete_messages(messages)
  Deimos.config.metrics&.increment(
    'outbox.process',
    tags: %W(topic:#{@current_topic}),
    by: messages.size
  )
  return false if batch_size < BATCH_SIZE

  KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive
  send_pending_metrics
  true
end

#produce_messages(batch) ⇒ void

This method returns an undefined value.

Produce messages in batches, reducing the size 1/10 if the batch is too large. Does not retry batches of messages that have already been sent.

Parameters:

  • batch (Array<Hash>)


203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/deimos/utils/outbox_producer.rb', line 203

def produce_messages(batch)
  batch_size = batch.size
  current_index = 0
  begin
    batch[current_index..-1].in_groups_of(batch_size, false).each do |group|
      @logger.debug("Publishing #{group.size} messages to #{@current_topic}")
      Karafka.producer.produce_many_sync(group)
      current_index += group.size
      @logger.info("Sent #{group.size} messages to #{@current_topic}")
    end
  end
end

#retrieve_messagesArray<Deimos::KafkaMessage>

Returns:



147
148
149
# File 'lib/deimos/utils/outbox_producer.rb', line 147

def retrieve_messages
  KafkaMessage.where(topic: @current_topic).order(:id).limit(BATCH_SIZE)
end

#retrieve_topicsArray<String>

Returns:

  • (Array<String>)


65
66
67
# File 'lib/deimos/utils/outbox_producer.rb', line 65

def retrieve_topics
  KafkaMessage.select('distinct topic').map(&:topic).uniq
end

#send_pending_metricsvoid

This method returns an undefined value.

Send metrics related to pending messages.



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/deimos/utils/outbox_producer.rb', line 164

def send_pending_metrics
  metrics = Deimos.config.metrics
  return unless metrics

  topics = KafkaTopicInfo.select(%w(topic last_processed_at))
  messages = Deimos::KafkaMessage.
    select('count(*) as num_messages, min(created_at) as earliest, topic').
    group(:topic).
    index_by(&:topic)
  topics.each do |record|
    message_record = messages[record.topic]
    # We want to record the last time we saw any activity, meaning either
    # the oldest message, or the last time we processed, whichever comes
    # last.
    if message_record
      record_earliest = message_record.earliest
      # SQLite gives a string here
      if record_earliest.is_a?(String)
        record_earliest = Time.zone.parse(record_earliest)
      end

      earliest = [record.last_processed_at, record_earliest].max
      time_diff = Time.zone.now - earliest
      metrics.gauge('pending_db_messages_max_wait', time_diff,
                    tags: ["topic:#{record.topic}"])
    else
      # no messages waiting
      metrics.gauge('pending_db_messages_max_wait', 0,
                    tags: ["topic:#{record.topic}"])
    end
    metrics.gauge('pending_db_messages_count', message_record&.num_messages || 0,
                  tags: ["topic:#{record.topic}"])
  end
end

#startvoid

This method returns an undefined value.

Start the poll.



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/deimos/utils/outbox_producer.rb', line 33

def start
  @logger.info('Starting...')
  @signal_to_stop = false
  ActiveRecord::Base.connection.reconnect!
  loop do
    if @signal_to_stop
      @logger.info('Shutting down')
      break
    end
    send_pending_metrics
    process_next_messages
  end
end

#stopvoid

This method returns an undefined value.

Stop the poll.



49
50
51
52
# File 'lib/deimos/utils/outbox_producer.rb', line 49

def stop
  @logger.info('Received signal to stop')
  @signal_to_stop = true
end