Class: Deimos::Utils::DbPoller::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/deimos/utils/db_poller/base.rb

Overview

Base poller class for retrieving and publishing messages.

Direct Known Subclasses

StateBased, TimeBased

Constant Summary collapse

FATAL_CODES =
%i(invalid_msg_size msg_size_too_large)
BATCH_SIZE =

Returns:

  • (Integer)
1000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Base

Returns a new instance of Base.

Parameters:

  • config (FigTree::ConfigStruct)


33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/deimos/utils/db_poller/base.rb', line 33

def initialize(config)
  @config = config
  @id = SecureRandom.hex
  begin
    if @config.poller_class.nil? && @config.producer_class.nil?
      raise 'No producers have been set for this DB poller!'
    end

    @resource_class = self.class.producers.any? ? self.class : @config.producer_class.constantize

    producer_classes.each do |producer_class|
      validate_producer_class(producer_class)
    end
  rescue NameError
    raise "Class #{@config.producer_class} not found!"
  end
end

Instance Attribute Details

#configHash (readonly)

Returns:

  • (Hash)


23
24
25
# File 'lib/deimos/utils/db_poller/base.rb', line 23

def config
  @config
end

#idInteger (readonly)

Needed for Executor so it can identify the worker

Returns:

  • (Integer)


20
21
22
# File 'lib/deimos/utils/db_poller/base.rb', line 20

def id
  @id
end

Class Method Details

.producersArray<Producer>

Method to define producers if a single poller needs to publish to multiple topics. Producer classes should be constantized

Returns:



28
29
30
# File 'lib/deimos/utils/db_poller/base.rb', line 28

def self.producers
  []
end

Instance Method Details

#create_poll_infoDeimos::PollInfo

Returns:



85
86
87
# File 'lib/deimos/utils/db_poller/base.rb', line 85

def create_poll_info
  Deimos::PollInfo.create!(producer: @resource_class.to_s, last_sent: Time.new(0))
end

#handle_message_too_large(exception, batch, status, span) ⇒ Boolean

Parameters:

  • exception (Exception)
  • batch (Array<ActiveRecord::Base>)
  • status (PollStatus)
  • span (Object)

Returns:

  • (Boolean)


114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/deimos/utils/db_poller/base.rb', line 114

def handle_message_too_large(exception, batch, status, span)
  Deimos::Logging.log_error("Error publishing through DB Poller: #{exception.message}")
  if @config.skip_too_large_messages
    Deimos::Logging.log_error("Skipping messages #{batch.map(&:id).join(', ')} since they are too large")
    Deimos.config.tracer&.set_error(span, exception)
    status.batches_errored += 1
    true
  else # do the same thing as regular Kafka::Error
    sleep(0.5)
    false
  end
end

#log_identifierString

Configure log identifier and messages to be used in subclasses

Returns:

  • (String)


179
180
181
# File 'lib/deimos/utils/db_poller/base.rb', line 179

def log_identifier
  "#{@resource_class.name}: #{producer_classes.map(&:topic)}"
end

#process_batch(batch) ⇒ void

This method returns an undefined value.

Publish batch using the configured producers

Parameters:

  • batch (Array<ActiveRecord::Base>)


171
172
173
174
175
# File 'lib/deimos/utils/db_poller/base.rb', line 171

def process_batch(batch)
  producer_classes.each do |producer|
    producer.send_events(batch)
  end
end

#process_batch_with_span(batch, status) ⇒ Boolean

rubocop:disable Metrics/AbcSize

Parameters:

  • batch (Array<ActiveRecord::Base>)
  • status (PollStatus)

Returns:

  • (Boolean)


131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/deimos/utils/db_poller/base.rb', line 131

def process_batch_with_span(batch, status)
  retries = 0
  begin
    span = Deimos.config.tracer&.start(
      'deimos-db-poller',
      resource: @resource_class.name.gsub('::', '-')
    )
    process_batch(batch)
    Deimos.config.tracer&.finish(span)
    status.batches_processed += 1
  rescue WaterDrop::Errors::ProduceManyError => e
    if FATAL_CODES.include?(e.cause.try(:code))
      retry unless handle_message_too_large(e, batch, status, span)
    else
      Deimos::Logging.log_error("Error publishing through DB Poller: #{e.message}")
      sleep(0.5)
      retry
    end
  rescue StandardError => e
    Deimos::Logging.log_error("Error publishing through DB poller: #{e.message}}")
    if @config.retries.nil? || retries < @config.retries
      retries += 1
      sleep(0.5)
      retry
    else
      Deimos::Logging.log_error('Retries exceeded, moving on to next batch')
      Deimos.config.tracer&.set_error(span, e)
      status.batches_errored += 1
      return false
    end
  ensure
    status.messages_processed += batch.size
  end
  true
end

#process_updatesvoid

This method returns an undefined value.

Send messages for updated data.



105
106
107
# File 'lib/deimos/utils/db_poller/base.rb', line 105

def process_updates
  raise Deimos::MissingImplementationError
end

#producer_classesArray<ActiveRecordProducer>

Return array of configured producers depending on poller class

Returns:



185
186
187
188
189
# File 'lib/deimos/utils/db_poller/base.rb', line 185

def producer_classes
  return self.class.producers if self.class.producers.any?

  [@config.producer_class.constantize]
end

#retrieve_poll_infovoid

Grab the PollInfo or create if it doesn’t exist.

Returns:

  • (void)
  • (void)


80
81
82
# File 'lib/deimos/utils/db_poller/base.rb', line 80

def retrieve_poll_info
  @info = Deimos::PollInfo.find_by_producer(@resource_class.to_s) || create_poll_info
end

#should_run?Boolean

Indicate whether this current loop should process updates. Most loops will busy-wait (sleeping 0.1 seconds) until it’s ready.

Returns:

  • (Boolean)


92
93
94
# File 'lib/deimos/utils/db_poller/base.rb', line 92

def should_run?
  Time.zone.now - @info.last_sent - @config.delay_time >= @config.run_every
end

#startvoid

This method returns an undefined value.

Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/deimos/utils/db_poller/base.rb', line 57

def start
  # Don't send asynchronously
  if Deimos.config.producers.backend == :kafka_async
    Deimos.config.producers.backend = :kafka
  end
  Deimos::Logging.log_info('Starting...')
  @signal_to_stop = false
  ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive?

  retrieve_poll_info
  loop do
    if @signal_to_stop
      Deimos::Logging.log_info('Shutting down')
      break
    end
    process_updates if should_run?
    sleep(0.1)
  end
end

#stopvoid

This method returns an undefined value.

Stop the poll.



98
99
100
101
# File 'lib/deimos/utils/db_poller/base.rb', line 98

def stop
  Deimos::Logging.log_info('Received signal to stop')
  @signal_to_stop = true
end

#validate_producer_class(producer_class) ⇒ void

This method returns an undefined value.

Validate if a producer class is an ActiveRecordProducer or not



193
194
195
196
197
# File 'lib/deimos/utils/db_poller/base.rb', line 193

def validate_producer_class(producer_class)
  unless producer_class < Deimos::ActiveRecordProducer
    raise "Class #{producer_class.class.name} is not an ActiveRecordProducer!"
  end
end