Class: Deimos::Utils::DbPoller
- Inherits:
-
Object
- Object
- Deimos::Utils::DbPoller
- Defined in:
- lib/deimos/utils/db_poller.rb
Overview
Class which continually polls the database and sends Kafka messages.
Constant Summary collapse
- BATCH_SIZE =
1000
Instance Attribute Summary collapse
-
#id ⇒ Integer
readonly
Needed for Executor so it can identify the worker.
Class Method Summary collapse
-
.start! ⇒ void
Begin the DB Poller process.
Instance Method Summary collapse
- #fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation
-
#initialize(config) ⇒ DbPoller
constructor
A new instance of DbPoller.
- #last_updated(record) ⇒ ActiveSupport::TimeWithZone
- #process_batch(batch) ⇒ void
-
#process_updates ⇒ void
Send messages for updated data.
-
#retrieve_poll_info ⇒ void
Grab the PollInfo or create if it doesn't exist.
-
#should_run? ⇒ Boolean
Indicate whether this current loop should process updates.
-
#start ⇒ void
Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.
-
#stop ⇒ void
Stop the poll.
Constructor Details
#initialize(config) ⇒ DbPoller
Returns a new instance of DbPoller.
35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/deimos/utils/db_poller.rb', line 35 def initialize(config) @config = config @id = SecureRandom.hex begin @producer = @config.producer_class.constantize rescue NameError raise "Class #{@config.producer_class} not found!" end unless @producer < Deimos::ActiveRecordProducer raise "Class #{@producer.class.name} is not an ActiveRecordProducer!" end end |
Instance Attribute Details
#id ⇒ Integer (readonly)
Needed for Executor so it can identify the worker
15 16 17 |
# File 'lib/deimos/utils/db_poller.rb', line 15 def id @id end |
Class Method Details
.start! ⇒ void
This method returns an undefined value.
Begin the DB Poller process.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/deimos/utils/db_poller.rb', line 19 def self.start! if Deimos.config.db_poller_objects.empty? raise('No pollers configured!') end pollers = Deimos.config.db_poller_objects.map do |poller_config| self.new(poller_config) end executor = Sigurd::Executor.new(pollers, sleep_seconds: 5, logger: Deimos.config.logger) signal_handler = Sigurd::SignalHandler.new(executor) signal_handler.run! end |
Instance Method Details
#fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation
132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/deimos/utils/db_poller.rb', line 132 def fetch_results(time_from, time_to) id = @producer.config[:record_class].primary_key = ActiveRecord::Base.connection.quote_column_name(@config.) quoted_id = ActiveRecord::Base.connection.quote_column_name(id) @producer.poll_query(time_from: time_from, time_to: time_to, column_name: @config., min_id: @info.last_sent_id). limit(BATCH_SIZE). order("#{}, #{quoted_id}") end |
#last_updated(record) ⇒ ActiveSupport::TimeWithZone
99 100 101 |
# File 'lib/deimos/utils/db_poller.rb', line 99 def last_updated(record) record.public_send(@config.) end |
#process_batch(batch) ⇒ void
This method returns an undefined value.
146 147 148 149 150 151 152 153 154 |
# File 'lib/deimos/utils/db_poller.rb', line 146 def process_batch(batch) record = batch.last id_method = record.class.primary_key last_id = record.public_send(id_method) last_updated_at = last_updated(record) @producer.send_events(batch) @info.attributes = { last_sent: last_updated_at, last_sent_id: last_id } @info.save! end |
#process_updates ⇒ void
This method returns an undefined value.
Send messages for updated data.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/deimos/utils/db_poller.rb', line 105 def process_updates return unless should_run? time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone time_to = Time.zone.now - @config.delay_time Deimos.config.logger.info("Polling #{@producer.topic} from #{time_from} to #{time_to}") = 0 batch_count = 0 # poll_query gets all the relevant data from the database, as defined # by the producer itself. loop do Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{batch_count + 1}") batch = fetch_results(time_from, time_to).to_a break if batch.empty? batch_count += 1 process_batch(batch) += batch.size time_from = last_updated(batch.last) end Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{} messages, #{batch_count} batches}") end |
#retrieve_poll_info ⇒ void
This method returns an undefined value.
Grab the PollInfo or create if it doesn't exist.
74 75 76 77 78 79 80 81 |
# File 'lib/deimos/utils/db_poller.rb', line 74 def retrieve_poll_info ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive? new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now @info = Deimos::PollInfo.find_by_producer(@config.producer_class) || Deimos::PollInfo.create!(producer: @config.producer_class, last_sent: new_time, last_sent_id: 0) end |
#should_run? ⇒ Boolean
Indicate whether this current loop should process updates. Most loops will busy-wait (sleeping 0.1 seconds) until it's ready.
93 94 95 |
# File 'lib/deimos/utils/db_poller.rb', line 93 def should_run? Time.zone.now - @info.last_sent - @config.delay_time >= @config.run_every end |
#start ⇒ void
This method returns an undefined value.
Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/deimos/utils/db_poller.rb', line 54 def start # Don't send asynchronously if Deimos.config.producers.backend == :kafka_async Deimos.config.producers.backend = :kafka end Deimos.config.logger.info('Starting...') @signal_to_stop = false retrieve_poll_info loop do if @signal_to_stop Deimos.config.logger.info('Shutting down') break end process_updates sleep 0.1 end end |
#stop ⇒ void
This method returns an undefined value.
Stop the poll.
85 86 87 88 |
# File 'lib/deimos/utils/db_poller.rb', line 85 def stop Deimos.config.logger.info('Received signal to stop') @signal_to_stop = true end |