Module: Deimos
- Includes:
- Instrumentation, FigTree
- Defined in:
- lib/deimos.rb,
lib/deimos/message.rb,
lib/deimos/version.rb,
lib/deimos/consumer.rb,
lib/deimos/producer.rb,
lib/deimos/poll_info.rb,
lib/deimos/backends/db.rb,
lib/deimos/kafka_source.rb,
lib/deimos/metrics/mock.rb,
lib/deimos/test_helpers.rb,
lib/deimos/tracing/mock.rb,
lib/deimos/backends/base.rb,
lib/deimos/backends/test.rb,
lib/deimos/kafka_message.rb,
lib/deimos/shared_config.rb,
lib/deimos/backends/kafka.rb,
lib/deimos/batch_consumer.rb,
lib/deimos/instrumentation.rb,
lib/deimos/metrics/datadog.rb,
lib/deimos/tracing/datadog.rb,
lib/deimos/utils/db_poller.rb,
lib/deimos/kafka_topic_info.rb,
lib/deimos/metrics/provider.rb,
lib/deimos/tracing/provider.rb,
lib/deimos/schema_class/base.rb,
lib/deimos/schema_class/enum.rb,
lib/deimos/utils/db_producer.rb,
lib/deimos/utils/lag_reporter.rb,
lib/deimos/utils/schema_class.rb,
lib/deimos/schema_class/record.rb,
lib/deimos/backends/kafka_async.rb,
lib/deimos/config/configuration.rb,
lib/deimos/config/phobos_config.rb,
lib/deimos/schema_backends/base.rb,
lib/deimos/schema_backends/mock.rb,
lib/deimos/utils/deadlock_retry.rb,
lib/deimos/utils/inline_consumer.rb,
lib/deimos/active_record_consumer.rb,
lib/deimos/active_record_producer.rb,
lib/deimos/consume/batch_consumption.rb,
lib/deimos/schema_backends/avro_base.rb,
lib/deimos/schema_backends/avro_local.rb,
lib/deimos/consume/message_consumption.rb,
lib/deimos/utils/schema_controller_mixin.rb,
lib/generators/deimos/db_poller_generator.rb,
lib/deimos/schema_backends/avro_validation.rb,
lib/generators/deimos/db_backend_generator.rb,
lib/generators/deimos/schema_class_generator.rb,
lib/deimos/active_record_consume/batch_slicer.rb,
lib/generators/deimos/active_record_generator.rb,
lib/deimos/schema_backends/avro_schema_coercer.rb,
lib/deimos/schema_backends/avro_schema_registry.rb,
lib/deimos/active_record_consume/batch_consumption.rb,
lib/deimos/active_record_consume/message_consumption.rb,
lib/deimos/active_record_consume/schema_model_converter.rb
Overview
Generates a new consumer.
Defined Under Namespace
Modules: ActiveRecordConsume, Backends, Consume, Generators, Instrumentation, KafkaListener, KafkaSource, Metrics, PhobosConfig, SchemaBackends, SchemaClass, SharedConfig, TestHelpers, Tracing, Utils Classes: ActiveRecordConsumer, ActiveRecordProducer, AvroSchemaCoercer, BatchConsumer, Consumer, KafkaMessage, KafkaTopicInfo, Message, PollInfo, Producer, Railtie, SchemaField
Constant Summary collapse
- VERSION =
'1.16.1'
Constants included from Instrumentation
Class Method Summary collapse
-
._disable_producer_classes(producer_classes) ⇒ Object
:nodoc:.
- .configure_producer_or_consumer(kafka_config) ⇒ Object
- .decode(schema:, namespace:, payload:) ⇒ Hash?
-
.disable_producers(*producer_classes, &block) ⇒ Object
Run a block without allowing any messages to be produced to Kafka.
- .encode(schema:, namespace:, payload:, subject: nil) ⇒ String
-
.load_generated_schema_classes ⇒ Object
Loads generated classes.
-
.producers_disabled?(producer_class = nil) ⇒ Boolean
Are producers disabled? If a class is passed in, check only that class.
- .schema_backend(schema:, namespace:) ⇒ Deimos::SchemaBackends::Base
-
.schema_backend_class ⇒ Class < Deimos::SchemaBackends::Base]
Class < Deimos::SchemaBackends::Base].
-
.start_db_backend!(thread_count: 1) ⇒ Object
Start the DB producers to send Kafka messages.
-
.validate_consumers ⇒ Object
Validate that consumers are configured correctly, including their delivery mode.
-
.validate_db_backend ⇒ Object
Ensure everything is set up correctly for the DB backend.
Class Method Details
._disable_producer_classes(producer_classes) ⇒ Object
:nodoc:
35 36 37 38 39 40 41 42 |
# File 'lib/deimos/producer.rb', line 35 def _disable_producer_classes(producer_classes) Thread.current[:frk_disabled_producers] ||= Set.new producers_to_disable = producer_classes - Thread.current[:frk_disabled_producers].to_a Thread.current[:frk_disabled_producers] += producers_to_disable yield Thread.current[:frk_disabled_producers] -= producers_to_disable end |
.configure_producer_or_consumer(kafka_config) ⇒ Object
78 79 80 81 82 83 84 85 86 87 |
# File 'lib/deimos/config/configuration.rb', line 78 def self.configure_producer_or_consumer(kafka_config) klass = kafka_config.class_name.constantize klass.class_eval do topic(kafka_config.topic) if kafka_config.topic.present? && klass.respond_to?(:topic) schema(kafka_config.schema) if kafka_config.schema.present? namespace(kafka_config.namespace) if kafka_config.namespace.present? key_config(**kafka_config.key_config) if kafka_config.key_config.present? schema_class_config(kafka_config.use_schema_classes) if kafka_config.use_schema_classes.present? end end |
.decode(schema:, namespace:, payload:) ⇒ Hash?
78 79 80 |
# File 'lib/deimos.rb', line 78 def decode(schema:, namespace:, payload:) self.schema_backend(schema: schema, namespace: namespace).decode(payload) end |
.disable_producers(*producer_classes, &block) ⇒ Object
Run a block without allowing any messages to be produced to Kafka. Optionally add a list of producer classes to limit the disabling to those classes.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/deimos/producer.rb', line 15 def disable_producers(*producer_classes, &block) if producer_classes.any? _disable_producer_classes(producer_classes, &block) return end if Thread.current[:frk_disable_all_producers] # nested disable block yield return end begin Thread.current[:frk_disable_all_producers] = true yield ensure Thread.current[:frk_disable_all_producers] = false end end |
.encode(schema:, namespace:, payload:, subject: nil) ⇒ String
69 70 71 72 |
# File 'lib/deimos.rb', line 69 def encode(schema:, namespace:, payload:, subject: nil) self.schema_backend(schema: schema, namespace: namespace). encode(payload, topic: subject || "#{namespace}.#{schema}" ) end |
.load_generated_schema_classes ⇒ Object
Loads generated classes
35 36 37 38 39 40 41 42 43 |
# File 'lib/deimos/config/configuration.rb', line 35 def self.load_generated_schema_classes if Deimos.config.schema.generated_class_path.nil? raise 'Cannot use schema classes without schema.generated_class_path. Please provide a directory.' end Dir["./#{Deimos.config.schema.generated_class_path}/**/*.rb"].sort.each { |f| require f } rescue LoadError raise 'Cannot load schema classes. Please regenerate classes with rake deimos:generate_schema_models.' end |
.producers_disabled?(producer_class = nil) ⇒ Boolean
Are producers disabled? If a class is passed in, check only that class. Otherwise check if the global disable flag is set.
47 48 49 50 |
# File 'lib/deimos/producer.rb', line 47 def producers_disabled?(producer_class=nil) Thread.current[:frk_disable_all_producers] || Thread.current[:frk_disabled_producers]&.include?(producer_class) end |
.schema_backend(schema:, namespace:) ⇒ Deimos::SchemaBackends::Base
60 61 62 |
# File 'lib/deimos.rb', line 60 def schema_backend(schema:, namespace:) schema_backend_class.new(schema: schema, namespace: namespace) end |
.schema_backend_class ⇒ Class < Deimos::SchemaBackends::Base]
Returns Class < Deimos::SchemaBackends::Base].
49 50 51 52 53 54 55 |
# File 'lib/deimos.rb', line 49 def schema_backend_class backend = Deimos.config.schema.backend.to_s require "deimos/schema_backends/#{backend}" "Deimos::SchemaBackends::#{backend.classify}".constantize end |
.start_db_backend!(thread_count: 1) ⇒ Object
Start the DB producers to send Kafka messages.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/deimos.rb', line 84 def start_db_backend!(thread_count: 1) Sigurd.exit_on_signal = true if self.config.producers.backend != :db raise('Publish backend is not set to :db, exiting') end if thread_count.nil? || thread_count.zero? raise('Thread count is not given or set to zero, exiting') end producers = (1..thread_count).map do Deimos::Utils::DbProducer. new(self.config.db_producer.logger || self.config.logger) end executor = Sigurd::Executor.new(producers, sleep_seconds: 5, logger: self.config.logger) signal_handler = Sigurd::SignalHandler.new(executor) signal_handler.run! end |
.validate_consumers ⇒ Object
Validate that consumers are configured correctly, including their delivery mode.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/deimos/config/configuration.rb', line 59 def self.validate_consumers Phobos.config.listeners.each do |listener| handler_class = listener.handler.constantize delivery = listener.delivery next unless handler_class < Deimos::Consumer # Validate that each consumer implements the correct method for its type if delivery == 'inline_batch' if handler_class.instance_method(:consume_batch).owner == Deimos::Consume::BatchConsumption raise "BatchConsumer #{listener.handler} does not implement `consume_batch`" end elsif handler_class.instance_method(:consume).owner == Deimos::Consume::MessageConsumption raise "Non-batch Consumer #{listener.handler} does not implement `consume`" end end end |
.validate_db_backend ⇒ Object
Ensure everything is set up correctly for the DB backend.
46 47 48 49 50 51 52 53 54 55 |
# File 'lib/deimos/config/configuration.rb', line 46 def self.validate_db_backend begin require 'activerecord-import' rescue LoadError raise 'Cannot set producers.backend to :db without activerecord-import! Please add it to your Gemfile.' end if Deimos.config.producers.required_acks != :all raise 'Cannot set producers.backend to :db unless producers.required_acks is set to ":all"!' end end |