Class: Deimos::Utils::DbPoller::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/deimos/utils/db_poller/base.rb

Overview

Base poller class for retrieving and publishing messages.

Direct Known Subclasses

StateBased, TimeBased

Constant Summary collapse

BATCH_SIZE =

Returns:

  • (Integer)
1000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Base

Returns a new instance of Base.

Parameters:



25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/deimos/utils/db_poller/base.rb', line 25

def initialize(config)
  @config = config
  @id = SecureRandom.hex
  begin
    @producer = @config.producer_class.constantize
  rescue NameError
    raise "Class #{@config.producer_class} not found!"
  end
  unless @producer < Deimos::ActiveRecordProducer
    raise "Class #{@producer.class.name} is not an ActiveRecordProducer!"
  end
end

Instance Attribute Details

#configHash (readonly)

Returns:

  • (Hash)


22
23
24
# File 'lib/deimos/utils/db_poller/base.rb', line 22

def config
  @config
end

#idInteger (readonly)

Needed for Executor so it can identify the worker

Returns:

  • (Integer)


19
20
21
# File 'lib/deimos/utils/db_poller/base.rb', line 19

def id
  @id
end

Instance Method Details

#create_poll_infoDeimos::PollInfo

Returns:



72
73
74
# File 'lib/deimos/utils/db_poller/base.rb', line 72

def create_poll_info
  Deimos::PollInfo.create!(producer: @config.producer_class, last_sent: Time.new(0))
end

#process_batch(batch) ⇒ void

This method returns an undefined value.

Parameters:

  • batch (Array<ActiveRecord::Base>)


133
134
135
# File 'lib/deimos/utils/db_poller/base.rb', line 133

def process_batch(batch)
  @producer.send_events(batch)
end

#process_batch_with_span(batch, status) ⇒ Boolean

Parameters:

  • batch (Array<ActiveRecord::Base>)
  • status (PollStatus)

Returns:

  • (Boolean)


99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/deimos/utils/db_poller/base.rb', line 99

def process_batch_with_span(batch, status)
  retries = 0
  begin
    span = Deimos.config.tracer&.start(
      'deimos-db-poller',
      resource: @producer.class.name.gsub('::', '-')
    )
    process_batch(batch)
    Deimos.config.tracer&.finish(span)
    status.batches_processed += 1
  rescue Kafka::Error => e # keep trying till it fixes itself
    Deimos.config.logger.error("Error publishing through DB Poller: #{e.message}")
    sleep(0.5)
    retry
  rescue StandardError => e
    Deimos.config.logger.error("Error publishing through DB poller: #{e.message}}")
    if retries < @config.retries
      retries += 1
      sleep(0.5)
      retry
    else
      Deimos.config.logger.error('Retries exceeded, moving on to next batch')
      Deimos.config.tracer&.set_error(span, e)
      status.batches_errored += 1
      return false
    end
  ensure
    status.messages_processed += batch.size
  end
  true
end

#process_updatesvoid

This method returns an undefined value.

Send messages for updated data.

Raises:

  • (Deimos::MissingImplementationError)


92
93
94
# File 'lib/deimos/utils/db_poller/base.rb', line 92

def process_updates
  raise Deimos::MissingImplementationError
end

#retrieve_poll_infovoid

Grab the PollInfo or create if it doesn't exist.

Returns:

  • (void)
  • (void)


67
68
69
# File 'lib/deimos/utils/db_poller/base.rb', line 67

def retrieve_poll_info
  @info = Deimos::PollInfo.find_by_producer(@config.producer_class) || create_poll_info
end

#should_run?Boolean

Indicate whether this current loop should process updates. Most loops will busy-wait (sleeping 0.1 seconds) until it's ready.

Returns:

  • (Boolean)


79
80
81
# File 'lib/deimos/utils/db_poller/base.rb', line 79

def should_run?
  Time.zone.now - @info.last_sent - @config.delay_time >= @config.run_every
end

#startvoid

This method returns an undefined value.

Start the poll: 1) Grab the current PollInfo from the database indicating the last time we ran 2) On a loop, process all the recent updates between the last time we ran and now.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/deimos/utils/db_poller/base.rb', line 44

def start
  # Don't send asynchronously
  if Deimos.config.producers.backend == :kafka_async
    Deimos.config.producers.backend = :kafka
  end
  Deimos.config.logger.info('Starting...')
  @signal_to_stop = false
  ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive?

  retrieve_poll_info
  loop do
    if @signal_to_stop
      Deimos.config.logger.info('Shutting down')
      break
    end
    process_updates if should_run?
    sleep(0.1)
  end
end

#stopvoid

This method returns an undefined value.

Stop the poll.



85
86
87
88
# File 'lib/deimos/utils/db_poller/base.rb', line 85

def stop
  Deimos.config.logger.info('Received signal to stop')
  @signal_to_stop = true
end