Class: Deimos::Utils::DbPoller::TimeBased
- Defined in:
- lib/deimos/utils/db_poller/time_based.rb
Overview
Poller that uses ID and updated_at to determine the records to publish.
Constant Summary
Constants inherited from Base
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#create_poll_info ⇒ Object
:nodoc:.
- #fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation
- #last_updated(record) ⇒ ActiveSupport::TimeWithZone
- #process_and_touch_info(batch, status) ⇒ Object
-
#process_updates ⇒ void
Send messages for updated data.
- #touch_info(batch) ⇒ void
Methods inherited from Base
#initialize, #process_batch, #process_batch_with_span, #retrieve_poll_info, #should_run?, #start, #stop
Constructor Details
This class inherits a constructor from Deimos::Utils::DbPoller::Base
Instance Method Details
#create_poll_info ⇒ Object
:nodoc:
12 13 14 15 16 17 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 12 def create_poll_info new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now Deimos::PollInfo.create!(producer: @config.producer_class, last_sent: new_time, last_sent_id: 0) end |
#fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation
53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 53 def fetch_results(time_from, time_to) id = @producer.config[:record_class].primary_key = ActiveRecord::Base.connection.quote_column_name(@config.) quoted_id = ActiveRecord::Base.connection.quote_column_name(id) @producer.poll_query(time_from: time_from, time_to: time_to, column_name: @config., min_id: @info.last_sent_id). limit(BATCH_SIZE). order("#{}, #{quoted_id}") end |
#last_updated(record) ⇒ ActiveSupport::TimeWithZone
67 68 69 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 67 def last_updated(record) record.public_send(@config.) end |
#process_and_touch_info(batch, status) ⇒ Object
21 22 23 24 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 21 def process_and_touch_info(batch, status) process_batch_with_span(batch, status) self.touch_info(batch) end |
#process_updates ⇒ void
This method returns an undefined value.
Send messages for updated data.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 28 def process_updates time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone time_to = Time.zone.now - @config.delay_time Deimos.config.logger.info("Polling #{@producer.topic} from #{time_from} to #{time_to}") status = PollStatus.new(0, 0, 0) # poll_query gets all the relevant data from the database, as defined # by the producer itself. loop do Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{status.current_batch}") batch = fetch_results(time_from, time_to).to_a if batch.empty? @info.touch(:last_sent) break end process_and_touch_info(batch, status) time_from = last_updated(batch.last) end Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{status.report})") end |
#touch_info(batch) ⇒ void
This method returns an undefined value.
73 74 75 76 77 78 79 80 |
# File 'lib/deimos/utils/db_poller/time_based.rb', line 73 def touch_info(batch) record = batch.last id_method = record.class.primary_key last_id = record.public_send(id_method) last_updated_at = last_updated(record) @info.attributes = { last_sent: last_updated_at, last_sent_id: last_id } @info.save! end |