Class: Deimos::Utils::DbPoller::Base
- Inherits:
-
Object
- Object
- Deimos::Utils::DbPoller::Base
- Defined in:
- lib/deimos/utils/db_poller/base.rb
Overview
Base poller class for retrieving and publishing messages.
Direct Known Subclasses
Constant Summary collapse
- FATAL_CODES =
%i(invalid_msg_size msg_size_too_large)
- BATCH_SIZE =
1000
Instance Attribute Summary collapse
- #config ⇒ Hash readonly
-
#id ⇒ Integer
readonly
Needed for Executor so it can identify the worker.
Class Method Summary collapse
-
.producers ⇒ Array<Producer>
Method to define producers if a single poller needs to publish to multiple topics.
Instance Method Summary collapse
- #create_poll_info ⇒ Deimos::PollInfo
- #handle_message_too_large(exception, batch, status, span) ⇒ Boolean
-
#initialize(config) ⇒ Base
constructor
A new instance of Base.
-
#log_identifier ⇒ String
Configure log identifier and messages to be used in subclasses.
-
#process_batch(batch) ⇒ void
Publish batch using the configured producers.
-
#process_batch_with_span(batch, status) ⇒ Boolean
rubocop:disable Metrics/AbcSize.
-
#process_updates ⇒ void
Send messages for updated data.
-
#producer_classes ⇒ Array<ActiveRecordProducer>
Return array of configured producers depending on poller class.
-
#retrieve_poll_info ⇒ void
Grab the PollInfo or create if it doesn’t exist.
-
#should_run? ⇒ Boolean
Indicate whether this current loop should process updates.
-
#start ⇒ void
Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.
-
#stop ⇒ void
Stop the poll.
-
#validate_producer_class(producer_class) ⇒ void
Validate if a producer class is an ActiveRecordProducer or not.
Constructor Details
#initialize(config) ⇒ Base
Returns a new instance of Base.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/deimos/utils/db_poller/base.rb', line 33 def initialize(config) @config = config @id = SecureRandom.hex begin if @config.poller_class.nil? && @config.producer_class.nil? raise 'No producers have been set for this DB poller!' end @resource_class = self.class.producers.any? ? self.class : @config.producer_class.constantize producer_classes.each do |producer_class| validate_producer_class(producer_class) end rescue NameError raise "Class #{@config.producer_class} not found!" end end |
Instance Attribute Details
#config ⇒ Hash (readonly)
23 24 25 |
# File 'lib/deimos/utils/db_poller/base.rb', line 23 def config @config end |
#id ⇒ Integer (readonly)
Needed for Executor so it can identify the worker
20 21 22 |
# File 'lib/deimos/utils/db_poller/base.rb', line 20 def id @id end |
Class Method Details
.producers ⇒ Array<Producer>
Method to define producers if a single poller needs to publish to multiple topics. Producer classes should be constantized
28 29 30 |
# File 'lib/deimos/utils/db_poller/base.rb', line 28 def self.producers [] end |
Instance Method Details
#create_poll_info ⇒ Deimos::PollInfo
85 86 87 |
# File 'lib/deimos/utils/db_poller/base.rb', line 85 def create_poll_info Deimos::PollInfo.create!(producer: @resource_class.to_s, last_sent: Time.new(0)) end |
#handle_message_too_large(exception, batch, status, span) ⇒ Boolean
114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/deimos/utils/db_poller/base.rb', line 114 def (exception, batch, status, span) Deimos::Logging.log_error("Error publishing through DB Poller: #{exception.}") if @config. Deimos::Logging.log_error("Skipping messages #{batch.map(&:id).join(', ')} since they are too large") Deimos.config.tracer&.set_error(span, exception) status.batches_errored += 1 true else # do the same thing as regular Kafka::Error sleep(0.5) false end end |
#log_identifier ⇒ String
Configure log identifier and messages to be used in subclasses
179 180 181 |
# File 'lib/deimos/utils/db_poller/base.rb', line 179 def log_identifier "#{@resource_class.name}: #{producer_classes.map(&:topic)}" end |
#process_batch(batch) ⇒ void
This method returns an undefined value.
Publish batch using the configured producers
171 172 173 174 175 |
# File 'lib/deimos/utils/db_poller/base.rb', line 171 def process_batch(batch) producer_classes.each do |producer| producer.send_events(batch) end end |
#process_batch_with_span(batch, status) ⇒ Boolean
rubocop:disable Metrics/AbcSize
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/deimos/utils/db_poller/base.rb', line 131 def process_batch_with_span(batch, status) retries = 0 begin span = Deimos.config.tracer&.start( 'deimos-db-poller', resource: @resource_class.name.gsub('::', '-') ) process_batch(batch) Deimos.config.tracer&.finish(span) status.batches_processed += 1 rescue WaterDrop::Errors::ProduceManyError => e if FATAL_CODES.include?(e.cause.try(:code)) retry unless (e, batch, status, span) else Deimos::Logging.log_error("Error publishing through DB Poller: #{e.}") sleep(0.5) retry end rescue StandardError => e Deimos::Logging.log_error("Error publishing through DB poller: #{e.}}") if @config.retries.nil? || retries < @config.retries retries += 1 sleep(0.5) retry else Deimos::Logging.log_error('Retries exceeded, moving on to next batch') Deimos.config.tracer&.set_error(span, e) status.batches_errored += 1 return false end ensure status. += batch.size end true end |
#process_updates ⇒ void
This method returns an undefined value.
Send messages for updated data.
105 106 107 |
# File 'lib/deimos/utils/db_poller/base.rb', line 105 def process_updates raise Deimos::MissingImplementationError end |
#producer_classes ⇒ Array<ActiveRecordProducer>
Return array of configured producers depending on poller class
185 186 187 188 189 |
# File 'lib/deimos/utils/db_poller/base.rb', line 185 def producer_classes return self.class.producers if self.class.producers.any? [@config.producer_class.constantize] end |
#retrieve_poll_info ⇒ void
Grab the PollInfo or create if it doesn’t exist.
80 81 82 |
# File 'lib/deimos/utils/db_poller/base.rb', line 80 def retrieve_poll_info @info = Deimos::PollInfo.find_by_producer(@resource_class.to_s) || create_poll_info end |
#should_run? ⇒ Boolean
Indicate whether this current loop should process updates. Most loops will busy-wait (sleeping 0.1 seconds) until it’s ready.
92 93 94 |
# File 'lib/deimos/utils/db_poller/base.rb', line 92 def should_run? Time.zone.now - @info.last_sent - @config.delay_time >= @config.run_every end |
#start ⇒ void
This method returns an undefined value.
Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/deimos/utils/db_poller/base.rb', line 57 def start # Don't send asynchronously if Deimos.config.producers.backend == :kafka_async Deimos.config.producers.backend = :kafka end Deimos::Logging.log_info('Starting...') @signal_to_stop = false ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive? retrieve_poll_info loop do if @signal_to_stop Deimos::Logging.log_info('Shutting down') break end process_updates if should_run? sleep(0.1) end end |
#stop ⇒ void
This method returns an undefined value.
Stop the poll.
98 99 100 101 |
# File 'lib/deimos/utils/db_poller/base.rb', line 98 def stop Deimos::Logging.log_info('Received signal to stop') @signal_to_stop = true end |
#validate_producer_class(producer_class) ⇒ void
This method returns an undefined value.
Validate if a producer class is an ActiveRecordProducer or not
193 194 195 196 197 |
# File 'lib/deimos/utils/db_poller/base.rb', line 193 def validate_producer_class(producer_class) unless producer_class < Deimos::ActiveRecordProducer raise "Class #{producer_class.class.name} is not an ActiveRecordProducer!" end end |