Class: Deimos::Utils::DbPoller
- Inherits:
-
Object
- Object
- Deimos::Utils::DbPoller
- Defined in:
- lib/deimos/utils/db_poller.rb
Overview
Class which continually polls the database and sends Kafka messages.
Constant Summary collapse
- BATCH_SIZE =
1000
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Needed for Executor so it can identify the worker.
Class Method Summary collapse
-
.start! ⇒ Object
Begin the DB Poller process.
Instance Method Summary collapse
- #fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation
-
#initialize(config) ⇒ DbPoller
constructor
A new instance of DbPoller.
- #last_updated(record) ⇒ ActiveSupport::TimeWithZone
- #process_batch(batch) ⇒ Object
-
#process_updates ⇒ Object
Send messages for updated data.
-
#retrieve_poll_info ⇒ Object
Grab the PollInfo or create if it doesn't exist.
-
#should_run? ⇒ Boolean
Indicate whether this current loop should process updates.
-
#start ⇒ Object
Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.
-
#stop ⇒ Object
Stop the poll.
Constructor Details
#initialize(config) ⇒ DbPoller
Returns a new instance of DbPoller.
32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/deimos/utils/db_poller.rb', line 32 def initialize(config) @config = config @id = SecureRandom.hex begin @producer = @config.producer_class.constantize rescue NameError raise "Class #{@config.producer_class} not found!" end unless @producer < Deimos::ActiveRecordProducer raise "Class #{@producer.class.name} is not an ActiveRecordProducer!" end end |
Instance Attribute Details
#id ⇒ Object (readonly)
Needed for Executor so it can identify the worker
13 14 15 |
# File 'lib/deimos/utils/db_poller.rb', line 13 def id @id end |
Class Method Details
.start! ⇒ Object
Begin the DB Poller process.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/deimos/utils/db_poller.rb', line 16 def self.start! if Deimos.config.db_poller_objects.empty? raise('No pollers configured!') end pollers = Deimos.config.db_poller_objects.map do |poller_config| self.new(poller_config) end executor = Sigurd::Executor.new(pollers, sleep_seconds: 5, logger: Deimos.config.logger) signal_handler = Sigurd::SignalHandler.new(executor) signal_handler.run! end |
Instance Method Details
#fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation
125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/deimos/utils/db_poller.rb', line 125 def fetch_results(time_from, time_to) id = @producer.config[:record_class].primary_key = ActiveRecord::Base.connection.quote_column_name(@config.) quoted_id = ActiveRecord::Base.connection.quote_column_name(id) @producer.poll_query(time_from: time_from, time_to: time_to, column_name: @config., min_id: @info.last_sent_id). limit(BATCH_SIZE). order("#{}, #{quoted_id}") end |
#last_updated(record) ⇒ ActiveSupport::TimeWithZone
93 94 95 |
# File 'lib/deimos/utils/db_poller.rb', line 93 def last_updated(record) record.public_send(@config.) end |
#process_batch(batch) ⇒ Object
138 139 140 141 142 143 144 145 146 |
# File 'lib/deimos/utils/db_poller.rb', line 138 def process_batch(batch) record = batch.last id_method = record.class.primary_key last_id = record.public_send(id_method) last_updated_at = last_updated(record) @producer.send_events(batch) @info.attributes = { last_sent: last_updated_at, last_sent_id: last_id } @info.save! end |
#process_updates ⇒ Object
Send messages for updated data.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/deimos/utils/db_poller.rb', line 98 def process_updates return unless should_run? time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone time_to = Time.zone.now - @config.delay_time Deimos.config.logger.info("Polling #{@producer.topic} from #{time_from} to #{time_to}") = 0 batch_count = 0 # poll_query gets all the relevant data from the database, as defined # by the producer itself. loop do Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{batch_count + 1}") batch = fetch_results(time_from, time_to).to_a break if batch.empty? batch_count += 1 process_batch(batch) += batch.size time_from = last_updated(batch.last) end Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{} messages, #{batch_count} batches}") end |
#retrieve_poll_info ⇒ Object
Grab the PollInfo or create if it doesn't exist.
69 70 71 72 73 74 75 76 |
# File 'lib/deimos/utils/db_poller.rb', line 69 def retrieve_poll_info ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive? new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now @info = Deimos::PollInfo.find_by_producer(@config.producer_class) || Deimos::PollInfo.create!(producer: @config.producer_class, last_sent: new_time, last_sent_id: 0) end |
#should_run? ⇒ Boolean
Indicate whether this current loop should process updates. Most loops will busy-wait (sleeping 0.1 seconds) until it's ready.
87 88 89 |
# File 'lib/deimos/utils/db_poller.rb', line 87 def should_run? Time.zone.now - @info.last_sent - @config.delay_time >= @config.run_every end |
#start ⇒ Object
Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/deimos/utils/db_poller.rb', line 50 def start # Don't send asynchronously if Deimos.config.producers.backend == :kafka_async Deimos.config.producers.backend = :kafka end Deimos.config.logger.info('Starting...') @signal_to_stop = false retrieve_poll_info loop do if @signal_to_stop Deimos.config.logger.info('Shutting down') break end process_updates sleep 0.1 end end |
#stop ⇒ Object
Stop the poll.
79 80 81 82 |
# File 'lib/deimos/utils/db_poller.rb', line 79 def stop Deimos.config.logger.info('Received signal to stop') @signal_to_stop = true end |