Class: Deimos::Utils::DbPoller::Base
- Inherits:
-
Object
- Object
- Deimos::Utils::DbPoller::Base
- Defined in:
- lib/deimos/utils/db_poller/base.rb
Overview
Base poller class for retrieving and publishing messages.
Direct Known Subclasses
Constant Summary collapse
- BATCH_SIZE =
1000
Instance Attribute Summary collapse
- #config ⇒ Hash readonly
-
#id ⇒ Integer
readonly
Needed for Executor so it can identify the worker.
Class Method Summary collapse
-
.producers ⇒ Array<Producer>
Method to define producers if a single poller needs to publish to multiple topics.
Instance Method Summary collapse
- #create_poll_info ⇒ Deimos::PollInfo
-
#initialize(config) ⇒ Base
constructor
A new instance of Base.
-
#log_identifier ⇒ String
Configure log identifier and messages to be used in subclasses.
-
#process_batch(batch) ⇒ void
Publish batch using the configured producers.
- #process_batch_with_span(batch, status) ⇒ Boolean
-
#process_updates ⇒ void
Send messages for updated data.
-
#producer_classes ⇒ Array<ActiveRecordProducer>
Return array of configured producers depending on poller class.
-
#retrieve_poll_info ⇒ void
Grab the PollInfo or create if it doesn’t exist.
-
#should_run? ⇒ Boolean
Indicate whether this current loop should process updates.
-
#start ⇒ void
Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.
-
#stop ⇒ void
Stop the poll.
-
#validate_producer_class(producer_class) ⇒ void
Validate if a producer class is an ActiveRecordProducer or not.
Constructor Details
#initialize(config) ⇒ Base
Returns a new instance of Base.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/deimos/utils/db_poller/base.rb', line 32 def initialize(config) @config = config @id = SecureRandom.hex begin if @config.poller_class.nil? && @config.producer_class.nil? raise 'No producers have been set for this DB poller!' end @resource_class = self.class.producers.any? ? self.class : @config.producer_class.constantize producer_classes.each do |producer_class| validate_producer_class(producer_class) end rescue NameError raise "Class #{@config.producer_class} not found!" end end |
Instance Attribute Details
#config ⇒ Hash (readonly)
22 23 24 |
# File 'lib/deimos/utils/db_poller/base.rb', line 22 def config @config end |
#id ⇒ Integer (readonly)
Needed for Executor so it can identify the worker
19 20 21 |
# File 'lib/deimos/utils/db_poller/base.rb', line 19 def id @id end |
Class Method Details
.producers ⇒ Array<Producer>
Method to define producers if a single poller needs to publish to multiple topics. Producer classes should be constantized
27 28 29 |
# File 'lib/deimos/utils/db_poller/base.rb', line 27 def self.producers [] end |
Instance Method Details
#create_poll_info ⇒ Deimos::PollInfo
84 85 86 |
# File 'lib/deimos/utils/db_poller/base.rb', line 84 def create_poll_info Deimos::PollInfo.create!(producer: @resource_class.to_s, last_sent: Time.new(0)) end |
#log_identifier ⇒ String
Configure log identifier and messages to be used in subclasses
154 155 156 |
# File 'lib/deimos/utils/db_poller/base.rb', line 154 def log_identifier "#{@resource_class.name}: #{producer_classes.map(&:topic)}" end |
#process_batch(batch) ⇒ void
This method returns an undefined value.
Publish batch using the configured producers
146 147 148 149 150 |
# File 'lib/deimos/utils/db_poller/base.rb', line 146 def process_batch(batch) producer_classes.each do |producer| producer.send_events(batch) end end |
#process_batch_with_span(batch, status) ⇒ Boolean
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/deimos/utils/db_poller/base.rb', line 111 def process_batch_with_span(batch, status) retries = 0 begin span = Deimos.config.tracer&.start( 'deimos-db-poller', resource: @resource_class.name.gsub('::', '-') ) process_batch(batch) Deimos.config.tracer&.finish(span) status.batches_processed += 1 rescue Kafka::Error => e # keep trying till it fixes itself Deimos.config.logger.error("Error publishing through DB Poller: #{e.}") sleep(0.5) retry rescue StandardError => e Deimos.config.logger.error("Error publishing through DB poller: #{e.}}") if retries < @config.retries retries += 1 sleep(0.5) retry else Deimos.config.logger.error('Retries exceeded, moving on to next batch') Deimos.config.tracer&.set_error(span, e) status.batches_errored += 1 return false end ensure status. += batch.size end true end |
#process_updates ⇒ void
This method returns an undefined value.
Send messages for updated data.
104 105 106 |
# File 'lib/deimos/utils/db_poller/base.rb', line 104 def process_updates raise Deimos::MissingImplementationError end |
#producer_classes ⇒ Array<ActiveRecordProducer>
Return array of configured producers depending on poller class
160 161 162 163 164 |
# File 'lib/deimos/utils/db_poller/base.rb', line 160 def producer_classes return self.class.producers if self.class.producers.any? [@config.producer_class.constantize] end |
#retrieve_poll_info ⇒ void
Grab the PollInfo or create if it doesn’t exist.
79 80 81 |
# File 'lib/deimos/utils/db_poller/base.rb', line 79 def retrieve_poll_info @info = Deimos::PollInfo.find_by_producer(@resource_class.to_s) || create_poll_info end |
#should_run? ⇒ Boolean
Indicate whether this current loop should process updates. Most loops will busy-wait (sleeping 0.1 seconds) until it’s ready.
91 92 93 |
# File 'lib/deimos/utils/db_poller/base.rb', line 91 def should_run? Time.zone.now - @info.last_sent - @config.delay_time >= @config.run_every end |
#start ⇒ void
This method returns an undefined value.
Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/deimos/utils/db_poller/base.rb', line 56 def start # Don't send asynchronously if Deimos.config.producers.backend == :kafka_async Deimos.config.producers.backend = :kafka end Deimos.config.logger.info('Starting...') @signal_to_stop = false ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive? retrieve_poll_info loop do if @signal_to_stop Deimos.config.logger.info('Shutting down') break end process_updates if should_run? sleep(0.1) end end |
#stop ⇒ void
This method returns an undefined value.
Stop the poll.
97 98 99 100 |
# File 'lib/deimos/utils/db_poller/base.rb', line 97 def stop Deimos.config.logger.info('Received signal to stop') @signal_to_stop = true end |
#validate_producer_class(producer_class) ⇒ void
This method returns an undefined value.
Validate if a producer class is an ActiveRecordProducer or not
168 169 170 171 172 |
# File 'lib/deimos/utils/db_poller/base.rb', line 168 def validate_producer_class(producer_class) unless producer_class < Deimos::ActiveRecordProducer raise "Class #{producer_class.class.name} is not an ActiveRecordProducer!" end end |