Module: Deimos

Includes:
Instrumentation, FigTree
Defined in:
lib/deimos.rb,
lib/deimos/message.rb,
lib/deimos/version.rb,
lib/deimos/consumer.rb,
lib/deimos/producer.rb,
lib/deimos/poll_info.rb,
lib/deimos/exceptions.rb,
lib/deimos/backends/db.rb,
lib/deimos/kafka_source.rb,
lib/deimos/metrics/mock.rb,
lib/deimos/test_helpers.rb,
lib/deimos/tracing/mock.rb,
lib/deimos/backends/base.rb,
lib/deimos/backends/test.rb,
lib/deimos/kafka_message.rb,
lib/deimos/shared_config.rb,
lib/deimos/backends/kafka.rb,
lib/deimos/batch_consumer.rb,
lib/deimos/instrumentation.rb,
lib/deimos/metrics/datadog.rb,
lib/deimos/tracing/datadog.rb,
lib/deimos/utils/db_poller.rb,
lib/deimos/kafka_topic_info.rb,
lib/deimos/metrics/provider.rb,
lib/deimos/tracing/provider.rb,
lib/deimos/schema_class/base.rb,
lib/deimos/schema_class/enum.rb,
lib/deimos/utils/db_producer.rb,
lib/deimos/utils/lag_reporter.rb,
lib/deimos/utils/schema_class.rb,
lib/deimos/schema_class/record.rb,
lib/deimos/backends/kafka_async.rb,
lib/deimos/config/configuration.rb,
lib/deimos/config/phobos_config.rb,
lib/deimos/schema_backends/base.rb,
lib/deimos/schema_backends/mock.rb,
lib/deimos/utils/db_poller/base.rb,
lib/deimos/utils/deadlock_retry.rb,
lib/deimos/utils/inline_consumer.rb,
lib/deimos/active_record_consumer.rb,
lib/deimos/active_record_producer.rb,
lib/deimos/consume/batch_consumption.rb,
lib/deimos/schema_backends/avro_base.rb,
lib/deimos/schema_backends/avro_local.rb,
lib/deimos/utils/db_poller/time_based.rb,
lib/deimos/consume/message_consumption.rb,
lib/deimos/utils/db_poller/state_based.rb,
lib/deimos/utils/schema_controller_mixin.rb,
lib/generators/deimos/db_poller_generator.rb,
lib/deimos/schema_backends/avro_validation.rb,
lib/generators/deimos/db_backend_generator.rb,
lib/generators/deimos/schema_class_generator.rb,
lib/deimos/active_record_consume/batch_record.rb,
lib/deimos/active_record_consume/batch_slicer.rb,
lib/deimos/active_record_consume/mass_updater.rb,
lib/generators/deimos/active_record_generator.rb,
lib/deimos/schema_backends/avro_schema_coercer.rb,
lib/generators/deimos/bulk_import_id_generator.rb,
lib/deimos/schema_backends/avro_schema_registry.rb,
lib/deimos/active_record_consume/batch_consumption.rb,
lib/deimos/active_record_consume/batch_record_list.rb,
lib/deimos/active_record_consume/message_consumption.rb,
lib/deimos/active_record_consume/schema_model_converter.rb

Overview

Generates a migration for bulk import ID in consumer.

Defined Under Namespace

Modules: ActiveRecordConsume, Backends, Consume, Generators, Instrumentation, KafkaListener, KafkaSource, Metrics, PhobosConfig, SchemaBackends, SchemaClass, SharedConfig, TestHelpers, Tracing, Utils Classes: ActiveRecordConsumer, ActiveRecordProducer, AvroSchemaCoercer, BatchConsumer, Consumer, KafkaMessage, KafkaTopicInfo, Message, MissingImplementationError, PollInfo, Producer, Railtie, SchemaField

Constant Summary collapse

VERSION =
'1.24.3'

Constants included from Instrumentation

Instrumentation::NAMESPACE

Class Method Summary collapse

Class Method Details

.decode(schema:, namespace:, payload:) ⇒ Hash?

Parameters:

  • schema (String)
  • namespace (String)
  • payload (String)

Returns:

  • (Hash, nil)


90
91
92
# File 'lib/deimos.rb', line 90

def decode(schema:, namespace:, payload:)
  self.schema_backend(schema: schema, namespace: namespace).decode(payload)
end

.disable_producers(*producer_classes, &block) ⇒ void

This method returns an undefined value.

Run a block without allowing any messages to be produced to Kafka. Optionally add a list of producer classes to limit the disabling to those classes.

Parameters:

  • producer_classes (Array<Class>, Class)


16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/deimos/producer.rb', line 16

def disable_producers(*producer_classes, &block)
  if producer_classes.any?
    _disable_producer_classes(producer_classes, &block)
    return
  end

  if Thread.current[:frk_disable_all_producers] # nested disable block
    yield
    return
  end

  begin
    Thread.current[:frk_disable_all_producers] = true
    yield
  ensure
    Thread.current[:frk_disable_all_producers] = false
  end
end

.encode(schema:, namespace:, payload:, subject: nil) ⇒ String

Parameters:

  • schema (String)
  • namespace (String)
  • payload (Hash)
  • subject (String) (defaults to: nil)

Returns:

  • (String)


81
82
83
84
# File 'lib/deimos.rb', line 81

def encode(schema:, namespace:, payload:, subject: nil)
  self.schema_backend(schema: schema, namespace: namespace).
    encode(payload, topic: subject || "#{namespace}.#{schema}" )
end

.load_generated_schema_classesvoid

This method returns an undefined value.

Loads generated classes



36
37
38
39
40
41
42
43
44
# File 'lib/deimos/config/configuration.rb', line 36

def self.load_generated_schema_classes
  if Deimos.config.schema.generated_class_path.nil?
    raise 'Cannot use schema classes without schema.generated_class_path. Please provide a directory.'
  end

  Dir["./#{Deimos.config.schema.generated_class_path}/**/*.rb"].sort.each { |f| require f }
rescue LoadError
  raise 'Cannot load schema classes. Please regenerate classes with rake deimos:generate_schema_models.'
end

.producers_disabled?(producer_class = nil) ⇒ Boolean

Are producers disabled? If a class is passed in, check only that class. Otherwise check if the global disable flag is set.

Parameters:

  • producer_class (Class) (defaults to: nil)

Returns:

  • (Boolean)


52
53
54
55
# File 'lib/deimos/producer.rb', line 52

def producers_disabled?(producer_class=nil)
  Thread.current[:frk_disable_all_producers] ||
    Thread.current[:frk_disabled_producers]&.include?(producer_class)
end

.schema_backend(schema:, namespace:) ⇒ Deimos::SchemaBackends::Base

Parameters:

  • schema (String, Symbol)
  • namespace (String)

Returns:



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/deimos.rb', line 59

def schema_backend(schema:, namespace:)
  if Utils::SchemaClass.use?(config.to_h)
    # Initialize an instance of the provided schema
    # in the event the schema class is an override, the inherited
    # schema and namespace will be applied
    schema_class = Utils::SchemaClass.klass(schema, namespace)
    if schema_class.nil?
      schema_backend_class.new(schema: schema, namespace: namespace)
    else
      schema_instance = schema_class.new
      schema_backend_class.new(schema: schema_instance.schema, namespace: schema_instance.namespace)
    end
  else
    schema_backend_class.new(schema: schema, namespace: namespace)
  end
end

.schema_backend_classClass<Deimos::SchemaBackends::Base>

Returns:



48
49
50
51
52
53
54
# File 'lib/deimos.rb', line 48

def schema_backend_class
  backend = Deimos.config.schema.backend.to_s

  require "deimos/schema_backends/#{backend}"

  "Deimos::SchemaBackends::#{backend.classify}".constantize
end

.start_db_backend!(thread_count: 1) ⇒ void

This method returns an undefined value.

Start the DB producers to send Kafka messages.

Parameters:

  • thread_count (Integer) (defaults to: 1)

    the number of threads to start.



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/deimos.rb', line 97

def start_db_backend!(thread_count: 1)
  Sigurd.exit_on_signal = true
  if self.config.producers.backend != :db
    raise('Publish backend is not set to :db, exiting')
  end

  if thread_count.nil? || thread_count.zero?
    raise('Thread count is not given or set to zero, exiting')
  end

  producers = (1..thread_count).map do
    Deimos::Utils::DbProducer.
      new(self.config.db_producer.logger || self.config.logger)
  end
  executor = Sigurd::Executor.new(producers,
                                  sleep_seconds: 5,
                                  logger: self.config.logger)
  signal_handler = Sigurd::SignalHandler.new(executor)
  signal_handler.run!
end