Class: Deimos::Utils::DbPoller

Inherits:
Object
  • Object
show all
Defined in:
lib/deimos/utils/db_poller.rb

Overview

Class which continually polls the database and sends Kafka messages.

Constant Summary collapse

BATCH_SIZE =
1000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ DbPoller

Returns a new instance of DbPoller.

Parameters:

  • config (Deimos::Configuration::ConfigStruct)


33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/deimos/utils/db_poller.rb', line 33

def initialize(config)
  @config = config
  @id = SecureRandom.hex
  begin
    @producer = @config.producer_class.constantize
  rescue NameError
    raise "Class #{@config.producer_class} not found!"
  end
  unless @producer < Deimos::ActiveRecordProducer
    raise "Class #{@producer.class.name} is not an ActiveRecordProducer!"
  end
end

Instance Attribute Details

#idObject (readonly)

Needed for Executor so it can identify the worker



14
15
16
# File 'lib/deimos/utils/db_poller.rb', line 14

def id
  @id
end

Class Method Details

.start!Object

Begin the DB Poller process.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/deimos/utils/db_poller.rb', line 17

def self.start!
  if Deimos.config.db_poller_objects.empty?
    raise('No pollers configured!')
  end

  pollers = Deimos.config.db_poller_objects.map do |poller_config|
    self.new(poller_config)
  end
  executor = Sigurd::Executor.new(pollers,
                                  sleep_seconds: 5,
                                  logger: Deimos.config.logger)
  signal_handler = Sigurd::SignalHandler.new(executor)
  signal_handler.run!
end

Instance Method Details

#fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation

Parameters:

  • time_from (ActiveSupport::TimeWithZone)
  • time_to (ActiveSupport::TimeWithZone)

Returns:

  • (ActiveRecord::Relation)


126
127
128
129
130
131
132
133
134
135
136
# File 'lib/deimos/utils/db_poller.rb', line 126

def fetch_results(time_from, time_to)
  id = @producer.config[:record_class].primary_key
  quoted_timestamp = ActiveRecord::Base.connection.quote_column_name(@config.timestamp_column)
  quoted_id = ActiveRecord::Base.connection.quote_column_name(id)
  @producer.poll_query(time_from: time_from,
                       time_to: time_to,
                       column_name: @config.timestamp_column,
                       min_id: @info.last_sent_id).
    limit(BATCH_SIZE).
    order("#{quoted_timestamp}, #{quoted_id}")
end

#last_updated(record) ⇒ ActiveSupport::TimeWithZone

Parameters:

  • record (ActiveRecord::Base)

Returns:

  • (ActiveSupport::TimeWithZone)


94
95
96
# File 'lib/deimos/utils/db_poller.rb', line 94

def last_updated(record)
  record.public_send(@config.timestamp_column)
end

#process_batch(batch) ⇒ Object

Parameters:

  • batch (Array<ActiveRecord::Base>)


139
140
141
142
143
144
145
146
147
# File 'lib/deimos/utils/db_poller.rb', line 139

def process_batch(batch)
  record = batch.last
  id_method = record.class.primary_key
  last_id = record.public_send(id_method)
  last_updated_at = last_updated(record)
  @producer.send_events(batch)
  @info.attributes = { last_sent: last_updated_at, last_sent_id: last_id }
  @info.save!
end

#process_updatesObject

Send messages for updated data.



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/deimos/utils/db_poller.rb', line 99

def process_updates
  return unless should_run?

  time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone
  time_to = Time.zone.now - @config.delay_time
  Deimos.config.logger.info("Polling #{@producer.topic} from #{time_from} to #{time_to}")
  message_count = 0
  batch_count = 0

  # poll_query gets all the relevant data from the database, as defined
  # by the producer itself.
  loop do
    Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{batch_count + 1}")
    batch = fetch_results(time_from, time_to).to_a
    break if batch.empty?

    batch_count += 1
    process_batch(batch)
    message_count += batch.size
    time_from = last_updated(batch.last)
  end
  Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{message_count} messages, #{batch_count} batches}")
end

#retrieve_poll_infoObject

Grab the PollInfo or create if it doesn't exist.



70
71
72
73
74
75
76
77
# File 'lib/deimos/utils/db_poller.rb', line 70

def retrieve_poll_info
  ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive?
  new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now
  @info = Deimos::PollInfo.find_by_producer(@config.producer_class) ||
          Deimos::PollInfo.create!(producer: @config.producer_class,
                                   last_sent: new_time,
                                   last_sent_id: 0)
end

#should_run?Boolean

Indicate whether this current loop should process updates. Most loops will busy-wait (sleeping 0.1 seconds) until it's ready.

Returns:

  • (Boolean)


88
89
90
# File 'lib/deimos/utils/db_poller.rb', line 88

def should_run?
  Time.zone.now - @info.last_sent - @config.delay_time >= @config.run_every
end

#startObject

Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/deimos/utils/db_poller.rb', line 51

def start
  # Don't send asynchronously
  if Deimos.config.producers.backend == :kafka_async
    Deimos.config.producers.backend = :kafka
  end
  Deimos.config.logger.info('Starting...')
  @signal_to_stop = false
  retrieve_poll_info
  loop do
    if @signal_to_stop
      Deimos.config.logger.info('Shutting down')
      break
    end
    process_updates
    sleep 0.1
  end
end

#stopObject

Stop the poll.



80
81
82
83
# File 'lib/deimos/utils/db_poller.rb', line 80

def stop
  Deimos.config.logger.info('Received signal to stop')
  @signal_to_stop = true
end