Module: Deimos

Includes:
Instrumentation, FigTree
Defined in:
lib/deimos.rb,
lib/deimos/message.rb,
lib/deimos/version.rb,
lib/deimos/consumer.rb,
lib/deimos/producer.rb,
lib/deimos/poll_info.rb,
lib/deimos/backends/db.rb,
lib/deimos/kafka_source.rb,
lib/deimos/metrics/mock.rb,
lib/deimos/test_helpers.rb,
lib/deimos/tracing/mock.rb,
lib/deimos/backends/base.rb,
lib/deimos/backends/test.rb,
lib/deimos/kafka_message.rb,
lib/deimos/shared_config.rb,
lib/deimos/backends/kafka.rb,
lib/deimos/batch_consumer.rb,
lib/deimos/instrumentation.rb,
lib/deimos/metrics/datadog.rb,
lib/deimos/tracing/datadog.rb,
lib/deimos/utils/db_poller.rb,
lib/deimos/kafka_topic_info.rb,
lib/deimos/metrics/provider.rb,
lib/deimos/tracing/provider.rb,
lib/deimos/schema_class/base.rb,
lib/deimos/schema_class/enum.rb,
lib/deimos/utils/db_producer.rb,
lib/deimos/utils/lag_reporter.rb,
lib/deimos/utils/schema_class.rb,
lib/deimos/schema_class/record.rb,
lib/deimos/backends/kafka_async.rb,
lib/deimos/config/configuration.rb,
lib/deimos/config/phobos_config.rb,
lib/deimos/schema_backends/base.rb,
lib/deimos/schema_backends/mock.rb,
lib/deimos/utils/deadlock_retry.rb,
lib/deimos/utils/inline_consumer.rb,
lib/deimos/active_record_consumer.rb,
lib/deimos/active_record_producer.rb,
lib/deimos/consume/batch_consumption.rb,
lib/deimos/schema_backends/avro_base.rb,
lib/deimos/schema_backends/avro_local.rb,
lib/deimos/consume/message_consumption.rb,
lib/deimos/utils/schema_controller_mixin.rb,
lib/generators/deimos/db_poller_generator.rb,
lib/deimos/schema_backends/avro_validation.rb,
lib/generators/deimos/db_backend_generator.rb,
lib/generators/deimos/schema_class_generator.rb,
lib/deimos/active_record_consume/batch_slicer.rb,
lib/generators/deimos/active_record_generator.rb,
lib/deimos/schema_backends/avro_schema_coercer.rb,
lib/deimos/schema_backends/avro_schema_registry.rb,
lib/deimos/active_record_consume/batch_consumption.rb,
lib/deimos/active_record_consume/message_consumption.rb,
lib/deimos/active_record_consume/schema_model_converter.rb

Overview

Generates a new consumer.

Defined Under Namespace

Modules: ActiveRecordConsume, Backends, Consume, Generators, Instrumentation, KafkaListener, KafkaSource, Metrics, PhobosConfig, SchemaBackends, SchemaClass, SharedConfig, TestHelpers, Tracing, Utils Classes: ActiveRecordConsumer, ActiveRecordProducer, AvroSchemaCoercer, BatchConsumer, Consumer, KafkaMessage, KafkaTopicInfo, Message, PollInfo, Producer, Railtie, SchemaField

Constant Summary collapse

VERSION =
'1.14.5'

Constants included from Instrumentation

Instrumentation::NAMESPACE

Class Method Summary collapse

Class Method Details

._disable_producer_classes(producer_classes) ⇒ Object

:nodoc:



35
36
37
38
39
40
41
42
# File 'lib/deimos/producer.rb', line 35

def _disable_producer_classes(producer_classes)
  Thread.current[:frk_disabled_producers] ||= Set.new
  producers_to_disable = producer_classes -
                         Thread.current[:frk_disabled_producers].to_a
  Thread.current[:frk_disabled_producers] += producers_to_disable
  yield
  Thread.current[:frk_disabled_producers] -= producers_to_disable
end

.configure_producer_or_consumer(kafka_config) ⇒ Object

Parameters:



78
79
80
81
82
83
84
85
86
87
# File 'lib/deimos/config/configuration.rb', line 78

def self.configure_producer_or_consumer(kafka_config)
  klass = kafka_config.class_name.constantize
  klass.class_eval do
    topic(kafka_config.topic) if kafka_config.topic.present? && klass.respond_to?(:topic)
    schema(kafka_config.schema) if kafka_config.schema.present?
    namespace(kafka_config.namespace) if kafka_config.namespace.present?
    key_config(**kafka_config.key_config) if kafka_config.key_config.present?
    schema_class_config(kafka_config.use_schema_classes) if kafka_config.use_schema_classes.present?
  end
end

.decode(schema:, namespace:, payload:) ⇒ Hash?

Parameters:

  • schema (String)
  • namespace (String)
  • payload (String)

Returns:

  • (Hash, nil)


79
80
81
# File 'lib/deimos.rb', line 79

def decode(schema:, namespace:, payload:)
  self.schema_backend(schema: schema, namespace: namespace).decode(payload)
end

.disable_producers(*producer_classes, &block) ⇒ Object

Run a block without allowing any messages to be produced to Kafka. Optionally add a list of producer classes to limit the disabling to those classes.

Parameters:

  • producer_classes (Array<Class>|Class)


15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/deimos/producer.rb', line 15

def disable_producers(*producer_classes, &block)
  if producer_classes.any?
    _disable_producer_classes(producer_classes, &block)
    return
  end

  if Thread.current[:frk_disable_all_producers] # nested disable block
    yield
    return
  end

  begin
    Thread.current[:frk_disable_all_producers] = true
    yield
  ensure
    Thread.current[:frk_disable_all_producers] = false
  end
end

.encode(schema:, namespace:, payload:, subject: nil) ⇒ String

Parameters:

  • schema (String)
  • namespace (String)
  • payload (Hash)
  • subject (String) (defaults to: nil)

Returns:

  • (String)


70
71
72
73
# File 'lib/deimos.rb', line 70

def encode(schema:, namespace:, payload:, subject: nil)
  self.schema_backend(schema: schema, namespace: namespace).
    encode(payload, topic: subject || "#{namespace}.#{schema}" )
end

.load_generated_schema_classesObject

Loads generated classes



35
36
37
38
39
40
41
42
43
# File 'lib/deimos/config/configuration.rb', line 35

def self.load_generated_schema_classes
  if Deimos.config.schema.generated_class_path.nil?
    raise 'Cannot use schema classes without schema.generated_class_path. Please provide a directory.'
  end

  Dir["./#{Deimos.config.schema.generated_class_path}/**/*.rb"].sort.each { |f| require f }
rescue LoadError
  raise 'Cannot load schema classes. Please regenerate classes with rake deimos:generate_schema_models.'
end

.producers_disabled?(producer_class = nil) ⇒ Boolean

Are producers disabled? If a class is passed in, check only that class. Otherwise check if the global disable flag is set.

Returns:

  • (Boolean)


47
48
49
50
# File 'lib/deimos/producer.rb', line 47

def producers_disabled?(producer_class=nil)
  Thread.current[:frk_disable_all_producers] ||
    Thread.current[:frk_disabled_producers]&.include?(producer_class)
end

.schema_backend(schema:, namespace:) ⇒ Deimos::SchemaBackends::Base

Parameters:

  • schema (String|Symbol)
  • namespace (String)

Returns:



61
62
63
# File 'lib/deimos.rb', line 61

def schema_backend(schema:, namespace:)
  schema_backend_class.new(schema: schema, namespace: namespace)
end

.schema_backend_classClass < Deimos::SchemaBackends::Base]

Returns Class < Deimos::SchemaBackends::Base].

Returns:



50
51
52
53
54
55
56
# File 'lib/deimos.rb', line 50

def schema_backend_class
  backend = Deimos.config.schema.backend.to_s

  require "deimos/schema_backends/#{backend}"

  "Deimos::SchemaBackends::#{backend.classify}".constantize
end

.start_db_backend!(thread_count: 1) ⇒ Object

Start the DB producers to send Kafka messages.

Parameters:

  • thread_count (Integer) (defaults to: 1)

    the number of threads to start.



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/deimos.rb', line 85

def start_db_backend!(thread_count: 1)
  if self.config.producers.backend != :db
    raise('Publish backend is not set to :db, exiting')
  end

  if thread_count.nil? || thread_count.zero?
    raise('Thread count is not given or set to zero, exiting')
  end

  producers = (1..thread_count).map do
    Deimos::Utils::DbProducer.
      new(self.config.db_producer.logger || self.config.logger)
  end
  executor = Sigurd::Executor.new(producers,
                                  sleep_seconds: 5,
                                  logger: self.config.logger)
  signal_handler = Sigurd::SignalHandler.new(executor)
  signal_handler.run!
end

.validate_consumersObject

Validate that consumers are configured correctly, including their delivery mode.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/deimos/config/configuration.rb', line 59

def self.validate_consumers
  Phobos.config.listeners.each do |listener|
    handler_class = listener.handler.constantize
    delivery = listener.delivery

    next unless handler_class < Deimos::Consumer

    # Validate that each consumer implements the correct method for its type
    if delivery == 'inline_batch'
      if handler_class.instance_method(:consume_batch).owner == Deimos::Consume::BatchConsumption
        raise "BatchConsumer #{listener.handler} does not implement `consume_batch`"
      end
    elsif handler_class.instance_method(:consume).owner == Deimos::Consume::MessageConsumption
      raise "Non-batch Consumer #{listener.handler} does not implement `consume`"
    end
  end
end

.validate_db_backendObject

Ensure everything is set up correctly for the DB backend.



46
47
48
49
50
51
52
53
54
55
# File 'lib/deimos/config/configuration.rb', line 46

def self.validate_db_backend
  begin
    require 'activerecord-import'
  rescue LoadError
    raise 'Cannot set producers.backend to :db without activerecord-import! Please add it to your Gemfile.'
  end
  if Deimos.config.producers.required_acks != :all
    raise 'Cannot set producers.backend to :db unless producers.required_acks is set to ":all"!'
  end
end