Class: Deimos::Utils::DbPoller::Base
- Inherits:
-
Object
- Object
- Deimos::Utils::DbPoller::Base
- Defined in:
- lib/deimos/utils/db_poller/base.rb
Overview
Base poller class for retrieving and publishing messages.
Direct Known Subclasses
Constant Summary collapse
- BATCH_SIZE =
1000
Instance Attribute Summary collapse
- #config ⇒ Hash readonly
-
#id ⇒ Integer
readonly
Needed for Executor so it can identify the worker.
Instance Method Summary collapse
- #create_poll_info ⇒ Deimos::PollInfo
-
#initialize(config) ⇒ Base
constructor
A new instance of Base.
- #process_batch(batch) ⇒ void
- #process_batch_with_span(batch, status) ⇒ Boolean
-
#process_updates ⇒ void
Send messages for updated data.
-
#retrieve_poll_info ⇒ void
Grab the PollInfo or create if it doesn't exist.
-
#should_run? ⇒ Boolean
Indicate whether this current loop should process updates.
-
#start ⇒ void
Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.
-
#stop ⇒ void
Stop the poll.
Constructor Details
#initialize(config) ⇒ Base
Returns a new instance of Base.
25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/deimos/utils/db_poller/base.rb', line 25 def initialize(config) @config = config @id = SecureRandom.hex begin @producer = @config.producer_class.constantize rescue NameError raise "Class #{@config.producer_class} not found!" end unless @producer < Deimos::ActiveRecordProducer raise "Class #{@producer.class.name} is not an ActiveRecordProducer!" end end |
Instance Attribute Details
#config ⇒ Hash (readonly)
22 23 24 |
# File 'lib/deimos/utils/db_poller/base.rb', line 22 def config @config end |
#id ⇒ Integer (readonly)
Needed for Executor so it can identify the worker
19 20 21 |
# File 'lib/deimos/utils/db_poller/base.rb', line 19 def id @id end |
Instance Method Details
#create_poll_info ⇒ Deimos::PollInfo
72 73 74 |
# File 'lib/deimos/utils/db_poller/base.rb', line 72 def create_poll_info Deimos::PollInfo.create!(producer: @config.producer_class, last_sent: Time.new(0)) end |
#process_batch(batch) ⇒ void
This method returns an undefined value.
133 134 135 |
# File 'lib/deimos/utils/db_poller/base.rb', line 133 def process_batch(batch) @producer.send_events(batch) end |
#process_batch_with_span(batch, status) ⇒ Boolean
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/deimos/utils/db_poller/base.rb', line 99 def process_batch_with_span(batch, status) retries = 0 begin span = Deimos.config.tracer&.start( 'deimos-db-poller', resource: @producer.class.name.gsub('::', '-') ) process_batch(batch) Deimos.config.tracer&.finish(span) status.batches_processed += 1 rescue Kafka::Error => e # keep trying till it fixes itself Deimos.config.logger.error("Error publishing through DB Poller: #{e.}") sleep(0.5) retry rescue StandardError => e Deimos.config.logger.error("Error publishing through DB poller: #{e.}}") if retries < @config.retries retries += 1 sleep(0.5) retry else Deimos.config.logger.error('Retries exceeded, moving on to next batch') Deimos.config.tracer&.set_error(span, e) status.batches_errored += 1 return false end ensure status. += batch.size end true end |
#process_updates ⇒ void
This method returns an undefined value.
Send messages for updated data.
92 93 94 |
# File 'lib/deimos/utils/db_poller/base.rb', line 92 def process_updates raise Deimos::MissingImplementationError end |
#retrieve_poll_info ⇒ void
Grab the PollInfo or create if it doesn't exist.
67 68 69 |
# File 'lib/deimos/utils/db_poller/base.rb', line 67 def retrieve_poll_info @info = Deimos::PollInfo.find_by_producer(@config.producer_class) || create_poll_info end |
#should_run? ⇒ Boolean
Indicate whether this current loop should process updates. Most loops will busy-wait (sleeping 0.1 seconds) until it's ready.
79 80 81 |
# File 'lib/deimos/utils/db_poller/base.rb', line 79 def should_run? Time.zone.now - @info.last_sent - @config.delay_time >= @config.run_every end |
#start ⇒ void
This method returns an undefined value.
Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/deimos/utils/db_poller/base.rb', line 44 def start # Don't send asynchronously if Deimos.config.producers.backend == :kafka_async Deimos.config.producers.backend = :kafka end Deimos.config.logger.info('Starting...') @signal_to_stop = false ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive? retrieve_poll_info loop do if @signal_to_stop Deimos.config.logger.info('Shutting down') break end process_updates if should_run? sleep(0.1) end end |
#stop ⇒ void
This method returns an undefined value.
Stop the poll.
85 86 87 88 |
# File 'lib/deimos/utils/db_poller/base.rb', line 85 def stop Deimos.config.logger.info('Received signal to stop') @signal_to_stop = true end |