Class: Deimos::Utils::DbPoller
- Inherits:
-
Object
- Object
- Deimos::Utils::DbPoller
- Defined in:
- lib/deimos/utils/db_poller.rb
Overview
Class which continually polls the database and sends Kafka messages.
Constant Summary collapse
- BATCH_SIZE =
1000
Instance Attribute Summary collapse
- #config ⇒ Hash readonly
-
#id ⇒ Integer
readonly
Needed for Executor so it can identify the worker.
Class Method Summary collapse
-
.start! ⇒ void
Begin the DB Poller process.
Instance Method Summary collapse
- #fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation
-
#initialize(config) ⇒ DbPoller
constructor
A new instance of DbPoller.
- #last_updated(record) ⇒ ActiveSupport::TimeWithZone
- #process_batch(batch) ⇒ void
- #process_batch_with_span(batch) ⇒ void
-
#process_updates ⇒ void
Send messages for updated data.
-
#retrieve_poll_info ⇒ void
Grab the PollInfo or create if it doesn't exist.
-
#should_run? ⇒ Boolean
Indicate whether this current loop should process updates.
-
#start ⇒ void
Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.
-
#stop ⇒ void
Stop the poll.
- #touch_info(batch) ⇒ void
Constructor Details
#initialize(config) ⇒ DbPoller
Returns a new instance of DbPoller.
38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/deimos/utils/db_poller.rb', line 38 def initialize(config) @config = config @id = SecureRandom.hex begin @producer = @config.producer_class.constantize rescue NameError raise "Class #{@config.producer_class} not found!" end unless @producer < Deimos::ActiveRecordProducer raise "Class #{@producer.class.name} is not an ActiveRecordProducer!" end end |
Instance Attribute Details
#config ⇒ Hash (readonly)
18 19 20 |
# File 'lib/deimos/utils/db_poller.rb', line 18 def config @config end |
#id ⇒ Integer (readonly)
Needed for Executor so it can identify the worker
15 16 17 |
# File 'lib/deimos/utils/db_poller.rb', line 15 def id @id end |
Class Method Details
.start! ⇒ void
This method returns an undefined value.
Begin the DB Poller process.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/deimos/utils/db_poller.rb', line 22 def self.start! if Deimos.config.db_poller_objects.empty? raise('No pollers configured!') end pollers = Deimos.config.db_poller_objects.map do |poller_config| self.new(poller_config) end executor = Sigurd::Executor.new(pollers, sleep_seconds: 5, logger: Deimos.config.logger) signal_handler = Sigurd::SignalHandler.new(executor) signal_handler.run! end |
Instance Method Details
#fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation
139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/deimos/utils/db_poller.rb', line 139 def fetch_results(time_from, time_to) id = @producer.config[:record_class].primary_key = ActiveRecord::Base.connection.quote_column_name(@config.) quoted_id = ActiveRecord::Base.connection.quote_column_name(id) @producer.poll_query(time_from: time_from, time_to: time_to, column_name: @config., min_id: @info.last_sent_id). limit(BATCH_SIZE). order("#{}, #{quoted_id}") end |
#last_updated(record) ⇒ ActiveSupport::TimeWithZone
102 103 104 |
# File 'lib/deimos/utils/db_poller.rb', line 102 def last_updated(record) record.public_send(@config.) end |
#process_batch(batch) ⇒ void
This method returns an undefined value.
195 196 197 198 |
# File 'lib/deimos/utils/db_poller.rb', line 195 def process_batch(batch) @producer.send_events(batch) self.touch_info(batch) end |
#process_batch_with_span(batch) ⇒ void
This method returns an undefined value.
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/deimos/utils/db_poller.rb', line 153 def process_batch_with_span(batch) retries = 0 begin span = Deimos.config.tracer&.start( 'deimos-db-poller', resource: @producer.class.name.gsub('::', '-') ) process_batch(batch) Deimos.config.tracer&.finish(span) rescue Kafka::Error => e # keep trying till it fixes itself Deimos.config.logger.error("Error publishing through DB Poller: #{e.}") sleep(0.5) retry rescue StandardError => e Deimos.config.logger.error("Error publishing through DB poller: #{e.}}") if retries < @config.retries retries += 1 sleep(0.5) retry else Deimos.config.logger.error('Retries exceeded, moving on to next batch') Deimos.config.tracer&.set_error(span, e) self.touch_info(batch) return false end end true end |
#process_updates ⇒ void
This method returns an undefined value.
Send messages for updated data.
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/deimos/utils/db_poller.rb', line 108 def process_updates return unless should_run? time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone time_to = Time.zone.now - @config.delay_time Deimos.config.logger.info("Polling #{@producer.topic} from #{time_from} to #{time_to}") = 0 batch_count = 0 error_count = 0 # poll_query gets all the relevant data from the database, as defined # by the producer itself. loop do Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{batch_count + 1}") batch = fetch_results(time_from, time_to).to_a break if batch.empty? if process_batch_with_span(batch) batch_count += 1 else error_count += 1 end += batch.size time_from = last_updated(batch.last) end Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{} messages, #{batch_count} successful batches, #{error_count} batches errored}") end |
#retrieve_poll_info ⇒ void
This method returns an undefined value.
Grab the PollInfo or create if it doesn't exist.
77 78 79 80 81 82 83 84 |
# File 'lib/deimos/utils/db_poller.rb', line 77 def retrieve_poll_info ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive? new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now @info = Deimos::PollInfo.find_by_producer(@config.producer_class) || Deimos::PollInfo.create!(producer: @config.producer_class, last_sent: new_time, last_sent_id: 0) end |
#should_run? ⇒ Boolean
Indicate whether this current loop should process updates. Most loops will busy-wait (sleeping 0.1 seconds) until it's ready.
96 97 98 |
# File 'lib/deimos/utils/db_poller.rb', line 96 def should_run? Time.zone.now - @info.last_sent - @config.delay_time >= @config.run_every end |
#start ⇒ void
This method returns an undefined value.
Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/deimos/utils/db_poller.rb', line 57 def start # Don't send asynchronously if Deimos.config.producers.backend == :kafka_async Deimos.config.producers.backend = :kafka end Deimos.config.logger.info('Starting...') @signal_to_stop = false retrieve_poll_info loop do if @signal_to_stop Deimos.config.logger.info('Shutting down') break end process_updates sleep 0.1 end end |
#stop ⇒ void
This method returns an undefined value.
Stop the poll.
88 89 90 91 |
# File 'lib/deimos/utils/db_poller.rb', line 88 def stop Deimos.config.logger.info('Received signal to stop') @signal_to_stop = true end |
#touch_info(batch) ⇒ void
This method returns an undefined value.
184 185 186 187 188 189 190 191 |
# File 'lib/deimos/utils/db_poller.rb', line 184 def touch_info(batch) record = batch.last id_method = record.class.primary_key last_id = record.public_send(id_method) last_updated_at = last_updated(record) @info.attributes = { last_sent: last_updated_at, last_sent_id: last_id } @info.save! end |