Class: Deimos::Utils::DbPoller::StateBased

Inherits:
Base
  • Object
show all
Defined in:
lib/deimos/utils/db_poller/state_based.rb

Overview

Poller that uses state columns to determine the records to publish.

Constant Summary

Constants inherited from Base

Base::BATCH_SIZE

Instance Attribute Summary

Attributes inherited from Base

#config, #id

Instance Method Summary collapse

Methods inherited from Base

#create_poll_info, #initialize, #process_batch, #process_batch_with_span, #retrieve_poll_info, #should_run?, #start, #stop

Constructor Details

This class inherits a constructor from Deimos::Utils::DbPoller::Base

Instance Method Details

#fetch_resultsActiveRecord::Relation

Returns:

  • (ActiveRecord::Relation)


30
31
32
# File 'lib/deimos/utils/db_poller/state_based.rb', line 30

def fetch_results
  @producer.poll_query.limit(BATCH_SIZE).order(@config.timestamp_column)
end

#finalize_batch(batch, success) ⇒ void

This method returns an undefined value.

Parameters:

  • batch (Array<ActiveRecord::Base>)
  • success (Boolean)


37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/deimos/utils/db_poller/state_based.rb', line 37

def finalize_batch(batch, success)
  @info.touch

  state = success ? @config.published_state : @config.failed_state
  klass = batch.first.class
  id_col = record.class.primary_key
  timestamp_col = @config.timestamp_column

  attrs = { timestamp_col => Time.zone.now }
  attrs[@config.state_column] = state if state
  if @config.publish_timestamp_column
    attrs[@config.publish_timestamp_column] = Time.zone.now
  end

  klass.where(id_col => batch.map(&id_col)).update_all(attrs)
end

#process_updatesvoid

This method returns an undefined value.

Send messages for updated data.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/deimos/utils/db_poller/state_based.rb', line 12

def process_updates
  Deimos.config.logger.info("Polling #{@producer.topic}")
  status = PollStatus.new(0, 0, 0)

  # poll_query gets all the relevant data from the database, as defined
  # by the producer itself.
  loop do
    Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{status.current_batch}")
    batch = fetch_results.to_a
    break if batch.empty?

    success = process_batch_with_span(batch, status)
    finalize_batch(batch, success)
  end
  Deimos.config.logger.info("Poll #{@producer.topic} complete (#{status.report}")
end