Class: Deimos::Utils::DbPoller

Inherits:
Object
  • Object
show all
Defined in:
lib/deimos/utils/db_poller.rb

Overview

Class which continually polls the database and sends Kafka messages.

Constant Summary collapse

BATCH_SIZE =

Returns:

  • (Integer)
1000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ DbPoller

Returns a new instance of DbPoller.

Parameters:



38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/deimos/utils/db_poller.rb', line 38

def initialize(config)
  @config = config
  @id = SecureRandom.hex
  begin
    @producer = @config.producer_class.constantize
  rescue NameError
    raise "Class #{@config.producer_class} not found!"
  end
  unless @producer < Deimos::ActiveRecordProducer
    raise "Class #{@producer.class.name} is not an ActiveRecordProducer!"
  end
end

Instance Attribute Details

#configHash (readonly)

Returns:

  • (Hash)


18
19
20
# File 'lib/deimos/utils/db_poller.rb', line 18

def config
  @config
end

#idInteger (readonly)

Needed for Executor so it can identify the worker

Returns:

  • (Integer)


15
16
17
# File 'lib/deimos/utils/db_poller.rb', line 15

def id
  @id
end

Class Method Details

.start!void

This method returns an undefined value.

Begin the DB Poller process.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/deimos/utils/db_poller.rb', line 22

def self.start!
  if Deimos.config.db_poller_objects.empty?
    raise('No pollers configured!')
  end

  pollers = Deimos.config.db_poller_objects.map do |poller_config|
    self.new(poller_config)
  end
  executor = Sigurd::Executor.new(pollers,
                                  sleep_seconds: 5,
                                  logger: Deimos.config.logger)
  signal_handler = Sigurd::SignalHandler.new(executor)
  signal_handler.run!
end

Instance Method Details

#fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation

Parameters:

  • time_from (ActiveSupport::TimeWithZone)
  • time_to (ActiveSupport::TimeWithZone)

Returns:

  • (ActiveRecord::Relation)


139
140
141
142
143
144
145
146
147
148
149
# File 'lib/deimos/utils/db_poller.rb', line 139

def fetch_results(time_from, time_to)
  id = @producer.config[:record_class].primary_key
  quoted_timestamp = ActiveRecord::Base.connection.quote_column_name(@config.timestamp_column)
  quoted_id = ActiveRecord::Base.connection.quote_column_name(id)
  @producer.poll_query(time_from: time_from,
                       time_to: time_to,
                       column_name: @config.timestamp_column,
                       min_id: @info.last_sent_id).
    limit(BATCH_SIZE).
    order("#{quoted_timestamp}, #{quoted_id}")
end

#last_updated(record) ⇒ ActiveSupport::TimeWithZone

Parameters:

  • record (ActiveRecord::Base)

Returns:

  • (ActiveSupport::TimeWithZone)


102
103
104
# File 'lib/deimos/utils/db_poller.rb', line 102

def last_updated(record)
  record.public_send(@config.timestamp_column)
end

#process_batch(batch) ⇒ void

This method returns an undefined value.

Parameters:

  • batch (Array<ActiveRecord::Base>)


195
196
197
198
# File 'lib/deimos/utils/db_poller.rb', line 195

def process_batch(batch)
  @producer.send_events(batch)
  self.touch_info(batch)
end

#process_batch_with_span(batch) ⇒ void

This method returns an undefined value.

Parameters:

  • batch (Array<ActiveRecord::Base>)


153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/deimos/utils/db_poller.rb', line 153

def process_batch_with_span(batch)
  retries = 0
  begin
    span = Deimos.config.tracer&.start(
      'deimos-db-poller',
      resource: @producer.class.name.gsub('::', '-')
    )
    process_batch(batch)
    Deimos.config.tracer&.finish(span)
  rescue Kafka::Error => e # keep trying till it fixes itself
    Deimos.config.logger.error("Error publishing through DB Poller: #{e.message}")
    sleep(0.5)
    retry
  rescue StandardError => e
    Deimos.config.logger.error("Error publishing through DB poller: #{e.message}}")
    if retries < @config.retries
      retries += 1
      sleep(0.5)
      retry
    else
      Deimos.config.logger.error('Retries exceeded, moving on to next batch')
      Deimos.config.tracer&.set_error(span, e)
      self.touch_info(batch)
      return false
    end
  end
  true
end

#process_updatesvoid

This method returns an undefined value.

Send messages for updated data.



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/deimos/utils/db_poller.rb', line 108

def process_updates
  return unless should_run?

  time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone
  time_to = Time.zone.now - @config.delay_time
  Deimos.config.logger.info("Polling #{@producer.topic} from #{time_from} to #{time_to}")
  message_count = 0
  batch_count = 0
  error_count = 0

  # poll_query gets all the relevant data from the database, as defined
  # by the producer itself.
  loop do
    Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{batch_count + 1}")
    batch = fetch_results(time_from, time_to).to_a
    break if batch.empty?

    if process_batch_with_span(batch)
      batch_count += 1
    else
      error_count += 1
    end
    message_count += batch.size
    time_from = last_updated(batch.last)
  end
  Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{message_count} messages, #{batch_count} successful batches, #{error_count} batches errored}")
end

#retrieve_poll_infovoid

This method returns an undefined value.

Grab the PollInfo or create if it doesn't exist.



77
78
79
80
81
82
83
84
# File 'lib/deimos/utils/db_poller.rb', line 77

def retrieve_poll_info
  ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive?
  new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now
  @info = Deimos::PollInfo.find_by_producer(@config.producer_class) ||
          Deimos::PollInfo.create!(producer: @config.producer_class,
                                   last_sent: new_time,
                                   last_sent_id: 0)
end

#should_run?Boolean

Indicate whether this current loop should process updates. Most loops will busy-wait (sleeping 0.1 seconds) until it's ready.

Returns:

  • (Boolean)


96
97
98
# File 'lib/deimos/utils/db_poller.rb', line 96

def should_run?
  Time.zone.now - @info.last_sent - @config.delay_time >= @config.run_every
end

#startvoid

This method returns an undefined value.

Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/deimos/utils/db_poller.rb', line 57

def start
  # Don't send asynchronously
  if Deimos.config.producers.backend == :kafka_async
    Deimos.config.producers.backend = :kafka
  end
  Deimos.config.logger.info('Starting...')
  @signal_to_stop = false
  retrieve_poll_info
  loop do
    if @signal_to_stop
      Deimos.config.logger.info('Shutting down')
      break
    end
    process_updates
    sleep 0.1
  end
end

#stopvoid

This method returns an undefined value.

Stop the poll.



88
89
90
91
# File 'lib/deimos/utils/db_poller.rb', line 88

def stop
  Deimos.config.logger.info('Received signal to stop')
  @signal_to_stop = true
end

#touch_info(batch) ⇒ void

This method returns an undefined value.

Parameters:

  • batch (Array<ActiveRecord::Base>)


184
185
186
187
188
189
190
191
# File 'lib/deimos/utils/db_poller.rb', line 184

def touch_info(batch)
  record = batch.last
  id_method = record.class.primary_key
  last_id = record.public_send(id_method)
  last_updated_at = last_updated(record)
  @info.attributes = { last_sent: last_updated_at, last_sent_id: last_id }
  @info.save!
end