Module: Deimos
- Includes:
- Instrumentation, FigTree
- Defined in:
- lib/deimos.rb,
lib/deimos/logging.rb,
lib/deimos/message.rb,
lib/deimos/version.rb,
lib/deimos/consumer.rb,
lib/deimos/producer.rb,
lib/deimos/poll_info.rb,
lib/deimos/exceptions.rb,
lib/deimos/transcoder.rb,
lib/deimos/kafka_source.rb,
lib/deimos/metrics/mock.rb,
lib/deimos/schema_class.rb,
lib/deimos/test_helpers.rb,
lib/deimos/tracing/mock.rb,
lib/deimos/backends/base.rb,
lib/deimos/backends/test.rb,
lib/deimos/kafka_message.rb,
lib/deimos/shared_config.rb,
lib/deimos/backends/kafka.rb,
lib/deimos/backends/outbox.rb,
lib/deimos/metrics/datadog.rb,
lib/deimos/tracing/datadog.rb,
lib/deimos/utils/db_poller.rb,
lib/deimos/ext/schema_route.rb,
lib/deimos/kafka_topic_info.rb,
lib/deimos/metrics/provider.rb,
lib/deimos/tracing/provider.rb,
lib/deimos/ext/consumer_route.rb,
lib/deimos/ext/producer_route.rb,
lib/deimos/utils/schema_class.rb,
lib/deimos/backends/kafka_async.rb,
lib/deimos/config/configuration.rb,
lib/deimos/schema_backends/base.rb,
lib/deimos/schema_backends/mock.rb,
lib/deimos/utils/db_poller/base.rb,
lib/deimos/utils/deadlock_retry.rb,
lib/deimos/schema_backends/plain.rb,
lib/deimos/utils/outbox_producer.rb,
lib/deimos/active_record_consumer.rb,
lib/deimos/active_record_producer.rb,
lib/deimos/ext/producer_middleware.rb,
lib/deimos/metrics/minimal_datadog.rb,
lib/generators/deimos/v2_generator.rb,
lib/deimos/consume/batch_consumption.rb,
lib/deimos/schema_backends/avro_base.rb,
lib/deimos/schema_backends/avro_local.rb,
lib/deimos/schema_backends/proto_base.rb,
lib/deimos/utils/db_poller/time_based.rb,
lib/deimos/consume/message_consumption.rb,
lib/deimos/schema_backends/proto_local.rb,
lib/deimos/utils/db_poller/state_based.rb,
lib/deimos/ext/producer_metrics_listener.rb,
lib/generators/deimos/db_poller_generator.rb,
lib/deimos/schema_backends/avro_validation.rb,
lib/deimos/metrics/minimal_datadog_listener.rb,
lib/deimos/active_record_consume/batch_record.rb,
lib/deimos/active_record_consume/batch_slicer.rb,
lib/deimos/active_record_consume/mass_updater.rb,
lib/generators/deimos/active_record_generator.rb,
lib/deimos/schema_backends/avro_schema_coercer.rb,
lib/generators/deimos/bulk_import_id_generator.rb,
lib/generators/deimos/outbox_backend_generator.rb,
lib/deimos/schema_backends/avro_schema_registry.rb,
lib/deimos/schema_backends/proto_schema_registry.rb,
lib/deimos/active_record_consume/batch_consumption.rb,
lib/deimos/active_record_consume/batch_record_list.rb,
lib/deimos/active_record_consume/message_consumption.rb,
lib/deimos/active_record_consume/schema_model_converter.rb,
sig/defs.rbs
Overview
Generates a new consumer.
Defined Under Namespace
Modules: ActiveRecordConsume, Backends, Consume, Generators, Instrumentation, KafkaListener, KafkaSource, Logging, Metrics, PhobosConfig, ProducerMiddleware, SchemaBackends, SchemaClass, SharedConfig, TestHelpers, Tracing, Utils Classes: ActiveRecordConsumer, ActiveRecordProducer, AvroSchemaCoercer, BatchConsumer, Consumer, ConsumerRoute, KafkaMessage, KafkaTopicInfo, Message, MissingImplementationError, PollInfo, Producer, ProducerMetricsListener, ProducerRoute, Railtie, RegistryInfo, SchemaField, SchemaRoute, Transcoder
Constant Summary collapse
- EVENT_TYPES =
%w( deimos.ar_consumer.consume_batch deimos.encode_message deimos.batch_consumption.invalid_records deimos.batch_consumption.valid_records deimos.outbox.produce ).freeze
- VERSION =
'2.6.0-beta1'
Class Attribute Summary collapse
-
.mock_backends ⇒ Boolean
For use in unit tests.
Class Method Summary collapse
-
.decode(schema:, namespace:, payload:) ⇒ ::Hash[untyped, untyped]?
@param
schema. - .decode_message(message) ⇒ Object
-
.disable_producers(*producer_classes, &block) ⇒ void
Run a block without allowing any messages to be produced to Kafka.
-
.encode(schema:, namespace:, payload:, subject: nil) ⇒ Object
@param
schema. - .generate_key_schemas ⇒ Object
- .karafka_config_for(topic: nil, producer: nil) ⇒ Karafka::Routing::Topic?
-
.karafka_configs ⇒ Array<Karafka::Routing::Topic]
Array<Karafka::Routing::Topic].
-
.load_generated_schema_classes ⇒ void
Loads generated classes.
- .producer_for(topic) ⇒ Object
-
.producers_disabled?(producer_class = nil) ⇒ Boolean
Are producers disabled? If a class is passed in, check only that class.
-
.schema_backend(schema:, namespace:, registry_info: nil, backend: Deimos.config.schema.backend) ⇒ Deimos::SchemaBackends::Base
@param
schema. - .schema_backend_class(backend: nil) ⇒ Class<Deimos::SchemaBackends::Base>
- .schema_backend_for(topic_name) ⇒ SchemaBackends::Base
- .setup_karafka ⇒ Object
- .setup_producers ⇒ Object
-
.start_db_backend! ⇒ void
Start the DB producers to send Kafka messages.
-
.start_outbox_backend!(thread_count: 1) ⇒ void
Start the DB producers to send Kafka messages.
- .topic_for_consumer(handler_class) ⇒ String?
- .waterdrop_producers ⇒ Array<::WaterDrop::Producer>
Class Attribute Details
.mock_backends ⇒ Boolean
Returns for use in unit tests.
58 59 60 |
# File 'lib/deimos.rb', line 58 def mock_backends @mock_backends end |
Class Method Details
.decode(schema:, namespace:, payload:) ⇒ ::Hash[untyped, untyped]?
@param schema
@param namespace
@param payload
140 141 142 |
# File 'lib/deimos.rb', line 140 def decode(schema:, namespace:, payload:) self.schema_backend(schema: schema, namespace: namespace).decode(payload) end |
.decode_message(message) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/deimos.rb', line 145 def () topic = [:topic] if Deimos.config.producers.topic_prefix topic = topic.sub(Deimos.config.producers.topic_prefix, '') end config = karafka_config_for(topic: topic) return unless config [:payload] = config.deserializers[:payload].([:payload]) if [:key] && config.deserializers[:key].respond_to?(:decode_message_hash) [:key] = config.deserializers[:key].([:key]) end end |
.disable_producers(*producer_classes, &block) ⇒ void
This method returns an undefined value.
Run a block without allowing any messages to be produced to Kafka. Optionally add a list of producer classes to limit the disabling to those classes.
@param producer_classes
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/deimos/producer.rb', line 16 def disable_producers(*producer_classes, &block) if producer_classes.any? _disable_producer_classes(producer_classes, &block) return end if Thread.current[:frk_disable_all_producers] # nested disable block yield return end begin Thread.current[:frk_disable_all_producers] = true yield ensure Thread.current[:frk_disable_all_producers] = false end end |
.encode(schema:, namespace:, payload:, subject: nil) ⇒ Object
@param schema
@param namespace
@param payload
@param subject
131 132 133 134 |
# File 'lib/deimos.rb', line 131 def encode(schema:, namespace:, payload:, subject: nil) self.schema_backend(schema: schema, namespace: namespace). encode(payload, topic: subject || "#{namespace}.#{schema}" ) end |
.generate_key_schemas ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/deimos/config/configuration.rb', line 28 def generate_key_schemas Deimos.karafka_configs.each do |config| transcoder = config.deserializers[:key] next unless transcoder.respond_to?(:key_field) && transcoder.key_field && transcoder.backend.supports_key_schemas? transcoder.backend = Deimos.schema_backend(schema: config.schema, namespace: config.namespace, backend: transcoder.backend_type) transcoder.backend.generate_key_schema(transcoder.key_field) end end |
.karafka_config_for(topic: nil, producer: nil) ⇒ Karafka::Routing::Topic?
229 230 231 232 233 234 235 |
# File 'lib/deimos.rb', line 229 def karafka_config_for(topic: nil, producer: nil) if topic karafka_configs.find { |t| t.name == topic } elsif producer karafka_configs.find { |t| t.producer_classes&.include?(producer) } end end |
.karafka_configs ⇒ Array<Karafka::Routing::Topic]
Returns Array<Karafka::Routing::Topic].
223 224 225 |
# File 'lib/deimos.rb', line 223 def karafka_configs Karafka::App.routes.flat_map(&:topics).flat_map(&:to_a) end |
.load_generated_schema_classes ⇒ void
This method returns an undefined value.
Loads generated classes
45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/deimos/config/configuration.rb', line 45 def load_generated_schema_classes path = AvroGen.config.generated_class_path if path.nil? raise 'Cannot use schema classes without a generated class path. ' \ 'Please set AvroGen.config.generated_class_path.' end Dir["./#{path}/**/*.rb"]. each { |f| require f } rescue LoadError raise 'Cannot load schema classes. Please regenerate classes with rake avro:generate.' end |
.producer_for(topic) ⇒ Object
243 244 245 |
# File 'lib/deimos.rb', line 243 def producer_for(topic) @producers[topic] || Karafka.producer end |
.producers_disabled?(producer_class = nil) ⇒ Boolean
Are producers disabled? If a class is passed in, check only that class. Otherwise check if the global disable flag is set.
@param producer_class
52 53 54 55 56 57 |
# File 'lib/deimos/producer.rb', line 52 def producers_disabled?(producer_class=nil) return true if Deimos.config.producers.disabled Thread.current[:frk_disable_all_producers] || Thread.current[:frk_disabled_producers]&.include?(producer_class) end |
.schema_backend(schema:, namespace:, registry_info: nil, backend: Deimos.config.schema.backend) ⇒ Deimos::SchemaBackends::Base
@param schema
@param namespace
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/deimos.rb', line 98 def schema_backend(schema:, namespace:, registry_info: nil, backend: Deimos.config.schema.backend) if config.schema.use_schema_classes # Initialize an instance of the provided schema # in the event the schema class is an override, the inherited # schema and namespace will be applied schema_class = AvroGen::SchemaClass.klass(schema, namespace) if schema_class.nil? schema_backend_class(backend: backend). new( schema: schema, namespace: namespace, registry_info: registry_info ) else schema_instance = schema_class.allocate schema_backend_class(backend: backend). new(schema: schema_instance.schema, namespace: schema_instance.namespace, registry_info: registry_info) end else schema_backend_class(backend: backend). new(schema: schema, namespace: namespace, registry_info: registry_info) end end |
.schema_backend_class(backend: nil) ⇒ Class<Deimos::SchemaBackends::Base>
62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/deimos.rb', line 62 def schema_backend_class(backend: nil) backend ||= Deimos.config.schema.backend require "deimos/schema_backends/#{backend}" klass = "Deimos::SchemaBackends::#{backend.to_s.classify}".constantize if self.mock_backends require "deimos/schema_backends/#{klass.mock_backend}" klass = "Deimos::SchemaBackends::#{klass.mock_backend.to_s.classify}".constantize end klass end |
.schema_backend_for(topic_name) ⇒ SchemaBackends::Base
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/deimos.rb', line 77 def schema_backend_for(topic_name) config = Deimos.karafka_config_for(topic: topic_name) registry_info = if config.registry_url Deimos::RegistryInfo.new( config.registry_url, config.registry_user, config.registry_password ) else nil end self.schema_backend(schema: config.schema, namespace: config.namespace, registry_info: registry_info, backend: config.schema_backend || Deimos.config.schema.backend) end |
.setup_karafka ⇒ Object
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/deimos.rb', line 200 def setup_karafka setup_producers waterdrop_producers.each do |producer| producer.middleware.append(Deimos::ProducerMiddleware) producer.monitor.subscribe(ProducerMetricsListener.new) producer.monitor.subscribe('error.occurred') do |event| if event.payload.key?(:messages) topic = event[:messages].first[:topic] config = Deimos.karafka_config_for(topic: topic) = Deimos::Logging.(config&.payload_log, event[:messages]) json = begin .to_json rescue StandardError .to_s end Karafka.logger.error("Error producing messages: #{event[:error].} #{json}") end end end EVENT_TYPES.each { |type| Karafka.monitor.notifications_bus.register_event(type) } end |
.setup_producers ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/deimos.rb', line 183 def setup_producers @producers = {} producers_by_broker = {} Deimos.karafka_configs.each do |topic| broker = topic.kafka[:'bootstrap.servers'] producers_by_broker[broker] ||= ::WaterDrop::Producer.new do |p_config| config_hash = Karafka::Setup::Config.config.kafka.merge(topic.kafka) p_config.kafka = Karafka::Setup::AttributesMap.producer(config_hash) end @producers[topic.name] = producers_by_broker[broker] end # Karafka.producer's kafka config is captured at first Karafka::App.setup; # apply the merged global so later overrides actually take effect. Karafka.producer.config.kafka = Karafka::Setup::AttributesMap.producer(Karafka::Setup::Config.config.kafka.dup) end |
.start_db_backend! ⇒ void
This method returns an undefined value.
Start the DB producers to send Kafka messages.
@param thread_count — the number of threads to start.
38 |
# File 'sig/defs.rbs', line 38
def self.start_db_backend!: (?thread_count: Integer) -> void
|
.start_outbox_backend!(thread_count: 1) ⇒ void
This method returns an undefined value.
Start the DB producers to send Kafka messages.
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/deimos.rb', line 162 def start_outbox_backend!(thread_count: 1) Sigurd.exit_on_signal = true if self.config.producers.backend != :outbox raise('Publish backend is not set to :outbox, exiting') end if thread_count.nil? || thread_count.zero? raise('Thread count is not given or set to zero, exiting') end producers = (1..thread_count).map do Deimos::Utils::OutboxProducer. new(self.config.outbox.logger || Karafka.logger) end executor = Sigurd::Executor.new(producers, sleep_seconds: 5, logger: Karafka.logger) signal_handler = Sigurd::SignalHandler.new(executor) signal_handler.run! end |
.topic_for_consumer(handler_class) ⇒ String?
248 249 250 251 252 253 254 255 |
# File 'lib/deimos.rb', line 248 def topic_for_consumer(handler_class) Deimos.karafka_configs.each do |topic| if topic.consumer == handler_class return topic.name end end nil end |
.waterdrop_producers ⇒ Array<::WaterDrop::Producer>
238 239 240 |
# File 'lib/deimos.rb', line 238 def waterdrop_producers (@producers.values + [Karafka.producer]).uniq end |