Class: Deimos::Utils::DbPoller::TimeBased

Inherits:
Base
  • Object
show all
Defined in:
lib/deimos/utils/db_poller/time_based.rb

Overview

Poller that uses ID and updated_at to determine the records to publish.

Constant Summary

Constants inherited from Base

Base::BATCH_SIZE

Instance Attribute Summary

Attributes inherited from Base

#config, #id

Instance Method Summary collapse

Methods inherited from Base

#initialize, #process_batch, #process_batch_with_span, #retrieve_poll_info, #should_run?, #start, #stop

Constructor Details

This class inherits a constructor from Deimos::Utils::DbPoller::Base

Instance Method Details

#create_poll_infoObject

:nodoc:



12
13
14
15
16
17
# File 'lib/deimos/utils/db_poller/time_based.rb', line 12

def create_poll_info
  new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now
  Deimos::PollInfo.create!(producer: @config.producer_class,
                           last_sent: new_time,
                           last_sent_id: 0)
end

#fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation

Parameters:

  • time_from (ActiveSupport::TimeWithZone)
  • time_to (ActiveSupport::TimeWithZone)

Returns:

  • (ActiveRecord::Relation)


50
51
52
53
54
55
56
57
58
59
60
# File 'lib/deimos/utils/db_poller/time_based.rb', line 50

def fetch_results(time_from, time_to)
  id = @producer.config[:record_class].primary_key
  quoted_timestamp = ActiveRecord::Base.connection.quote_column_name(@config.timestamp_column)
  quoted_id = ActiveRecord::Base.connection.quote_column_name(id)
  @producer.poll_query(time_from: time_from,
                       time_to: time_to,
                       column_name: @config.timestamp_column,
                       min_id: @info.last_sent_id).
    limit(BATCH_SIZE).
    order("#{quoted_timestamp}, #{quoted_id}")
end

#last_updated(record) ⇒ ActiveSupport::TimeWithZone

Parameters:

  • record (ActiveRecord::Base)

Returns:

  • (ActiveSupport::TimeWithZone)


64
65
66
# File 'lib/deimos/utils/db_poller/time_based.rb', line 64

def last_updated(record)
  record.public_send(@config.timestamp_column)
end

#process_and_touch_info(batch, status) ⇒ Object

Parameters:



21
22
23
24
# File 'lib/deimos/utils/db_poller/time_based.rb', line 21

def process_and_touch_info(batch, status)
  process_batch_with_span(batch, status)
  self.touch_info(batch)
end

#process_updatesvoid

This method returns an undefined value.

Send messages for updated data.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/deimos/utils/db_poller/time_based.rb', line 28

def process_updates
  time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone
  time_to = Time.zone.now - @config.delay_time
  Deimos.config.logger.info("Polling #{@producer.topic} from #{time_from} to #{time_to}")
  status = PollStatus.new(0, 0, 0)

  # poll_query gets all the relevant data from the database, as defined
  # by the producer itself.
  loop do
    Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{status.current_batch}")
    batch = fetch_results(time_from, time_to).to_a
    break if batch.empty?

    process_and_touch_info(batch, status)
    time_from = last_updated(batch.last)
  end
  Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{status.report})")
end

#touch_info(batch) ⇒ void

This method returns an undefined value.

Parameters:

  • batch (Array<ActiveRecord::Base>)


70
71
72
73
74
75
76
77
# File 'lib/deimos/utils/db_poller/time_based.rb', line 70

def touch_info(batch)
  record = batch.last
  id_method = record.class.primary_key
  last_id = record.public_send(id_method)
  last_updated_at = last_updated(record)
  @info.attributes = { last_sent: last_updated_at, last_sent_id: last_id }
  @info.save!
end