Class: Deimos::Utils::DbPoller::TimeBased
- Defined in:
- lib/deimos/utils/db_poller/time_based.rb
Overview
Poller that uses ID and updated_at to determine the records to publish.
Constant Summary
Constants inherited from Base
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#create_poll_info ⇒ Object
:nodoc:.
- #fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation
- #last_updated(record) ⇒ ActiveSupport::TimeWithZone
- #process_and_touch_info(batch, status) ⇒ Object
-
#process_updates ⇒ void
Send messages for updated data.
- #touch_info(batch) ⇒ void
Methods inherited from Base
#initialize, #process_batch, #process_batch_with_span, #retrieve_poll_info, #should_run?, #start, #stop
Constructor Details
This class inherits a constructor from Deimos::Utils::DbPoller::Base
Instance Method Details
#create_poll_info ⇒ Object
:nodoc:
12 13 14 15 16 17 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 12 def create_poll_info new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now Deimos::PollInfo.create!(producer: @config.producer_class, last_sent: new_time, last_sent_id: 0) end |
#fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation
50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 50 def fetch_results(time_from, time_to) id = @producer.config[:record_class].primary_key = ActiveRecord::Base.connection.quote_column_name(@config.) quoted_id = ActiveRecord::Base.connection.quote_column_name(id) @producer.poll_query(time_from: time_from, time_to: time_to, column_name: @config., min_id: @info.last_sent_id). limit(BATCH_SIZE). order("#{}, #{quoted_id}") end |
#last_updated(record) ⇒ ActiveSupport::TimeWithZone
64 65 66 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 64 def last_updated(record) record.public_send(@config.) end |
#process_and_touch_info(batch, status) ⇒ Object
21 22 23 24 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 21 def process_and_touch_info(batch, status) process_batch_with_span(batch, status) self.touch_info(batch) end |
#process_updates ⇒ void
This method returns an undefined value.
Send messages for updated data.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 28 def process_updates time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone time_to = Time.zone.now - @config.delay_time Deimos.config.logger.info("Polling #{@producer.topic} from #{time_from} to #{time_to}") status = PollStatus.new(0, 0, 0) # poll_query gets all the relevant data from the database, as defined # by the producer itself. loop do Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{status.current_batch}") batch = fetch_results(time_from, time_to).to_a break if batch.empty? process_and_touch_info(batch, status) time_from = last_updated(batch.last) end Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{status.report})") end |
#touch_info(batch) ⇒ void
This method returns an undefined value.
70 71 72 73 74 75 76 77 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 70 def touch_info(batch) record = batch.last id_method = record.class.primary_key last_id = record.public_send(id_method) last_updated_at = last_updated(record) @info.attributes = { last_sent: last_updated_at, last_sent_id: last_id } @info.save! end |