Class: Deimos::Utils::DbPoller

Inherits:
Object
  • Object
show all
Defined in:
lib/deimos/utils/db_poller.rb

Overview

Class which continually polls the database and sends Kafka messages.

Constant Summary collapse

BATCH_SIZE =

Returns:

  • (Integer)
1000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ DbPoller

Returns a new instance of DbPoller.

Parameters:



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/deimos/utils/db_poller.rb', line 35

def initialize(config)
  @config = config
  @id = SecureRandom.hex
  begin
    @producer = @config.producer_class.constantize
  rescue NameError
    raise "Class #{@config.producer_class} not found!"
  end
  unless @producer < Deimos::ActiveRecordProducer
    raise "Class #{@producer.class.name} is not an ActiveRecordProducer!"
  end
end

Instance Attribute Details

#idInteger (readonly)

Needed for Executor so it can identify the worker

Returns:

  • (Integer)


15
16
17
# File 'lib/deimos/utils/db_poller.rb', line 15

def id
  @id
end

Class Method Details

.start!void

This method returns an undefined value.

Begin the DB Poller process.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/deimos/utils/db_poller.rb', line 19

def self.start!
  if Deimos.config.db_poller_objects.empty?
    raise('No pollers configured!')
  end

  pollers = Deimos.config.db_poller_objects.map do |poller_config|
    self.new(poller_config)
  end
  executor = Sigurd::Executor.new(pollers,
                                  sleep_seconds: 5,
                                  logger: Deimos.config.logger)
  signal_handler = Sigurd::SignalHandler.new(executor)
  signal_handler.run!
end

Instance Method Details

#fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation

Parameters:

  • time_from (ActiveSupport::TimeWithZone)
  • time_to (ActiveSupport::TimeWithZone)

Returns:

  • (ActiveRecord::Relation)


132
133
134
135
136
137
138
139
140
141
142
# File 'lib/deimos/utils/db_poller.rb', line 132

def fetch_results(time_from, time_to)
  id = @producer.config[:record_class].primary_key
  quoted_timestamp = ActiveRecord::Base.connection.quote_column_name(@config.timestamp_column)
  quoted_id = ActiveRecord::Base.connection.quote_column_name(id)
  @producer.poll_query(time_from: time_from,
                       time_to: time_to,
                       column_name: @config.timestamp_column,
                       min_id: @info.last_sent_id).
    limit(BATCH_SIZE).
    order("#{quoted_timestamp}, #{quoted_id}")
end

#last_updated(record) ⇒ ActiveSupport::TimeWithZone

Parameters:

  • record (ActiveRecord::Base)

Returns:

  • (ActiveSupport::TimeWithZone)


99
100
101
# File 'lib/deimos/utils/db_poller.rb', line 99

def last_updated(record)
  record.public_send(@config.timestamp_column)
end

#process_batch(batch) ⇒ void

This method returns an undefined value.

Parameters:

  • batch (Array<ActiveRecord::Base>)


146
147
148
149
150
151
152
153
154
# File 'lib/deimos/utils/db_poller.rb', line 146

def process_batch(batch)
  record = batch.last
  id_method = record.class.primary_key
  last_id = record.public_send(id_method)
  last_updated_at = last_updated(record)
  @producer.send_events(batch)
  @info.attributes = { last_sent: last_updated_at, last_sent_id: last_id }
  @info.save!
end

#process_updatesvoid

This method returns an undefined value.

Send messages for updated data.



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/deimos/utils/db_poller.rb', line 105

def process_updates
  return unless should_run?

  time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone
  time_to = Time.zone.now - @config.delay_time
  Deimos.config.logger.info("Polling #{@producer.topic} from #{time_from} to #{time_to}")
  message_count = 0
  batch_count = 0

  # poll_query gets all the relevant data from the database, as defined
  # by the producer itself.
  loop do
    Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{batch_count + 1}")
    batch = fetch_results(time_from, time_to).to_a
    break if batch.empty?

    batch_count += 1
    process_batch(batch)
    message_count += batch.size
    time_from = last_updated(batch.last)
  end
  Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{message_count} messages, #{batch_count} batches}")
end

#retrieve_poll_infovoid

This method returns an undefined value.

Grab the PollInfo or create if it doesn't exist.



74
75
76
77
78
79
80
81
# File 'lib/deimos/utils/db_poller.rb', line 74

def retrieve_poll_info
  ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive?
  new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now
  @info = Deimos::PollInfo.find_by_producer(@config.producer_class) ||
          Deimos::PollInfo.create!(producer: @config.producer_class,
                                   last_sent: new_time,
                                   last_sent_id: 0)
end

#should_run?Boolean

Indicate whether this current loop should process updates. Most loops will busy-wait (sleeping 0.1 seconds) until it's ready.

Returns:

  • (Boolean)


93
94
95
# File 'lib/deimos/utils/db_poller.rb', line 93

def should_run?
  Time.zone.now - @info.last_sent - @config.delay_time >= @config.run_every
end

#startvoid

This method returns an undefined value.

Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/deimos/utils/db_poller.rb', line 54

def start
  # Don't send asynchronously
  if Deimos.config.producers.backend == :kafka_async
    Deimos.config.producers.backend = :kafka
  end
  Deimos.config.logger.info('Starting...')
  @signal_to_stop = false
  retrieve_poll_info
  loop do
    if @signal_to_stop
      Deimos.config.logger.info('Shutting down')
      break
    end
    process_updates
    sleep 0.1
  end
end

#stopvoid

This method returns an undefined value.

Stop the poll.



85
86
87
88
# File 'lib/deimos/utils/db_poller.rb', line 85

def stop
  Deimos.config.logger.info('Received signal to stop')
  @signal_to_stop = true
end