Class: Deimos::Utils::DbPoller::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/deimos/utils/db_poller/base.rb

Overview

Base poller class for retrieving and publishing messages.

Direct Known Subclasses

StateBased, TimeBased

Constant Summary collapse

BATCH_SIZE =

Returns:

  • (Integer)
1000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Base

Returns a new instance of Base.

Parameters:



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/deimos/utils/db_poller/base.rb', line 32

def initialize(config)
  @config = config
  @id = SecureRandom.hex
  begin
    if @config.poller_class.nil? && @config.producer_class.nil?
      raise 'No producers have been set for this DB poller!'
    end

    @resource_class = self.class.producers.any? ? self.class : @config.producer_class.constantize

    producer_classes.each do |producer_class|
      validate_producer_class(producer_class)
    end
  rescue NameError
    raise "Class #{@config.producer_class} not found!"
  end
end

Instance Attribute Details

#configHash (readonly)

Returns:

  • (Hash)


22
23
24
# File 'lib/deimos/utils/db_poller/base.rb', line 22

def config
  @config
end

#idInteger (readonly)

Needed for Executor so it can identify the worker

Returns:

  • (Integer)


19
20
21
# File 'lib/deimos/utils/db_poller/base.rb', line 19

def id
  @id
end

Class Method Details

.producersArray<Producer>

Method to define producers if a single poller needs to publish to multiple topics. Producer classes should be constantized

Returns:



27
28
29
# File 'lib/deimos/utils/db_poller/base.rb', line 27

def self.producers
  []
end

Instance Method Details

#create_poll_infoDeimos::PollInfo

Returns:



84
85
86
# File 'lib/deimos/utils/db_poller/base.rb', line 84

def create_poll_info
  Deimos::PollInfo.create!(producer: @resource_class.to_s, last_sent: Time.new(0))
end

#log_identifierString

Configure log identifier and messages to be used in subclasses

Returns:

  • (String)


154
155
156
# File 'lib/deimos/utils/db_poller/base.rb', line 154

def log_identifier
  "#{@resource_class.name}: #{producer_classes.map(&:topic)}"
end

#process_batch(batch) ⇒ void

This method returns an undefined value.

Publish batch using the configured producers

Parameters:

  • batch (Array<ActiveRecord::Base>)


146
147
148
149
150
# File 'lib/deimos/utils/db_poller/base.rb', line 146

def process_batch(batch)
  producer_classes.each do |producer|
    producer.send_events(batch)
  end
end

#process_batch_with_span(batch, status) ⇒ Boolean

Parameters:

  • batch (Array<ActiveRecord::Base>)
  • status (PollStatus)

Returns:

  • (Boolean)


111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/deimos/utils/db_poller/base.rb', line 111

def process_batch_with_span(batch, status)
  retries = 0
  begin
    span = Deimos.config.tracer&.start(
      'deimos-db-poller',
      resource: @resource_class.name.gsub('::', '-')
    )
    process_batch(batch)
    Deimos.config.tracer&.finish(span)
    status.batches_processed += 1
  rescue Kafka::Error => e # keep trying till it fixes itself
    Deimos.config.logger.error("Error publishing through DB Poller: #{e.message}")
    sleep(0.5)
    retry
  rescue StandardError => e
    Deimos.config.logger.error("Error publishing through DB poller: #{e.message}}")
    if retries < @config.retries
      retries += 1
      sleep(0.5)
      retry
    else
      Deimos.config.logger.error('Retries exceeded, moving on to next batch')
      Deimos.config.tracer&.set_error(span, e)
      status.batches_errored += 1
      return false
    end
  ensure
    status.messages_processed += batch.size
  end
  true
end

#process_updatesvoid

This method returns an undefined value.

Send messages for updated data.



104
105
106
# File 'lib/deimos/utils/db_poller/base.rb', line 104

def process_updates
  raise Deimos::MissingImplementationError
end

#producer_classesArray<ActiveRecordProducer>

Return array of configured producers depending on poller class

Returns:



160
161
162
163
164
# File 'lib/deimos/utils/db_poller/base.rb', line 160

def producer_classes
  return self.class.producers if self.class.producers.any?

  [@config.producer_class.constantize]
end

#retrieve_poll_infovoid

Grab the PollInfo or create if it doesn’t exist.

Returns:

  • (void)
  • (void)


79
80
81
# File 'lib/deimos/utils/db_poller/base.rb', line 79

def retrieve_poll_info
  @info = Deimos::PollInfo.find_by_producer(@resource_class.to_s) || create_poll_info
end

#should_run?Boolean

Indicate whether this current loop should process updates. Most loops will busy-wait (sleeping 0.1 seconds) until it’s ready.

Returns:

  • (Boolean)


91
92
93
# File 'lib/deimos/utils/db_poller/base.rb', line 91

def should_run?
  Time.zone.now - @info.last_sent - @config.delay_time >= @config.run_every
end

#startvoid

This method returns an undefined value.

Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/deimos/utils/db_poller/base.rb', line 56

def start
  # Don't send asynchronously
  if Deimos.config.producers.backend == :kafka_async
    Deimos.config.producers.backend = :kafka
  end
  Deimos.config.logger.info('Starting...')
  @signal_to_stop = false
  ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive?

  retrieve_poll_info
  loop do
    if @signal_to_stop
      Deimos.config.logger.info('Shutting down')
      break
    end
    process_updates if should_run?
    sleep(0.1)
  end
end

#stopvoid

This method returns an undefined value.

Stop the poll.



97
98
99
100
# File 'lib/deimos/utils/db_poller/base.rb', line 97

def stop
  Deimos.config.logger.info('Received signal to stop')
  @signal_to_stop = true
end

#validate_producer_class(producer_class) ⇒ void

This method returns an undefined value.

Validate if a producer class is an ActiveRecordProducer or not



168
169
170
171
172
# File 'lib/deimos/utils/db_poller/base.rb', line 168

def validate_producer_class(producer_class)
  unless producer_class < Deimos::ActiveRecordProducer
    raise "Class #{producer_class.class.name} is not an ActiveRecordProducer!"
  end
end