Class: Deimos::Utils::DbPoller::TimeBased

Inherits:
Base
  • Object
show all
Defined in:
lib/deimos/utils/db_poller/time_based.rb

Overview

Poller that uses ID and updated_at to determine the records to publish.

Constant Summary

Constants inherited from Base

Base::BATCH_SIZE

Instance Attribute Summary

Attributes inherited from Base

#config, #id

Instance Method Summary collapse

Methods inherited from Base

#initialize, #process_batch, #process_batch_with_span, #retrieve_poll_info, #should_run?, #start, #stop

Constructor Details

This class inherits a constructor from Deimos::Utils::DbPoller::Base

Instance Method Details

#create_poll_infoObject

:nodoc:



12
13
14
15
16
17
# File 'lib/deimos/utils/db_poller/time_based.rb', line 12

def create_poll_info
  new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now
  Deimos::PollInfo.create!(producer: @config.producer_class,
                           last_sent: new_time,
                           last_sent_id: 0)
end

#fetch_results(time_from, time_to) ⇒ ActiveRecord::Relation

Parameters:

  • time_from (ActiveSupport::TimeWithZone)
  • time_to (ActiveSupport::TimeWithZone)

Returns:

  • (ActiveRecord::Relation)


53
54
55
56
57
58
59
60
61
62
63
# File 'lib/deimos/utils/db_poller/time_based.rb', line 53

def fetch_results(time_from, time_to)
  id = @producer.config[:record_class].primary_key
  quoted_timestamp = ActiveRecord::Base.connection.quote_column_name(@config.timestamp_column)
  quoted_id = ActiveRecord::Base.connection.quote_column_name(id)
  @producer.poll_query(time_from: time_from,
                       time_to: time_to,
                       column_name: @config.timestamp_column,
                       min_id: @info.last_sent_id).
    limit(BATCH_SIZE).
    order("#{quoted_timestamp}, #{quoted_id}")
end

#last_updated(record) ⇒ ActiveSupport::TimeWithZone

Parameters:

  • record (ActiveRecord::Base)

Returns:

  • (ActiveSupport::TimeWithZone)


67
68
69
# File 'lib/deimos/utils/db_poller/time_based.rb', line 67

def last_updated(record)
  record.public_send(@config.timestamp_column)
end

#process_and_touch_info(batch, status) ⇒ Object

Parameters:



21
22
23
24
# File 'lib/deimos/utils/db_poller/time_based.rb', line 21

def process_and_touch_info(batch, status)
  process_batch_with_span(batch, status)
  self.touch_info(batch)
end

#process_updatesvoid

This method returns an undefined value.

Send messages for updated data.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/deimos/utils/db_poller/time_based.rb', line 28

def process_updates
  time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone
  time_to = Time.zone.now - @config.delay_time
  Deimos.config.logger.info("Polling #{@producer.topic} from #{time_from} to #{time_to}")
  status = PollStatus.new(0, 0, 0)

  # poll_query gets all the relevant data from the database, as defined
  # by the producer itself.
  loop do
    Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{status.current_batch}")
    batch = fetch_results(time_from, time_to).to_a
    if batch.empty?
      @info.touch(:last_sent)
      break
    end

    process_and_touch_info(batch, status)
    time_from = last_updated(batch.last)
  end
  Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{status.report})")
end

#touch_info(batch) ⇒ void

This method returns an undefined value.

Parameters:

  • batch (Array<ActiveRecord::Base>)


73
74
75
76
77
78
79
80
# File 'lib/deimos/utils/db_poller/time_based.rb', line 73

def touch_info(batch)
  record = batch.last
  id_method = record.class.primary_key
  last_id = record.public_send(id_method)
  last_updated_at = last_updated(record)
  @info.attributes = { last_sent: last_updated_at, last_sent_id: last_id }
  @info.save!
end