Module: Deimos
- Includes:
- FigTree
- Defined in:
- lib/deimos.rb,
lib/deimos/logging.rb,
lib/deimos/message.rb,
lib/deimos/version.rb,
lib/deimos/consumer.rb,
lib/deimos/producer.rb,
lib/deimos/poll_info.rb,
lib/deimos/exceptions.rb,
lib/deimos/transcoder.rb,
lib/deimos/kafka_source.rb,
lib/deimos/metrics/mock.rb,
lib/deimos/test_helpers.rb,
lib/deimos/tracing/mock.rb,
lib/deimos/backends/base.rb,
lib/deimos/backends/test.rb,
lib/deimos/kafka_message.rb,
lib/deimos/shared_config.rb,
lib/deimos/backends/kafka.rb,
lib/deimos/backends/outbox.rb,
lib/deimos/metrics/datadog.rb,
lib/deimos/tracing/datadog.rb,
lib/deimos/utils/db_poller.rb,
lib/deimos/ext/schema_route.rb,
lib/deimos/kafka_topic_info.rb,
lib/deimos/metrics/provider.rb,
lib/deimos/tracing/provider.rb,
lib/deimos/schema_class/base.rb,
lib/deimos/schema_class/enum.rb,
lib/deimos/ext/consumer_route.rb,
lib/deimos/ext/producer_route.rb,
lib/deimos/utils/schema_class.rb,
lib/deimos/schema_class/record.rb,
lib/deimos/backends/kafka_async.rb,
lib/deimos/config/configuration.rb,
lib/deimos/schema_backends/base.rb,
lib/deimos/schema_backends/mock.rb,
lib/deimos/utils/db_poller/base.rb,
lib/deimos/utils/deadlock_retry.rb,
lib/deimos/schema_backends/plain.rb,
lib/deimos/utils/outbox_producer.rb,
lib/deimos/active_record_consumer.rb,
lib/deimos/active_record_producer.rb,
lib/deimos/ext/producer_middleware.rb,
lib/deimos/metrics/minimal_datadog.rb,
lib/generators/deimos/v2_generator.rb,
lib/deimos/consume/batch_consumption.rb,
lib/deimos/schema_backends/avro_base.rb,
lib/deimos/schema_backends/avro_local.rb,
lib/deimos/schema_backends/proto_base.rb,
lib/deimos/utils/db_poller/time_based.rb,
lib/deimos/consume/message_consumption.rb,
lib/deimos/schema_backends/proto_local.rb,
lib/deimos/utils/db_poller/state_based.rb,
lib/deimos/ext/producer_metrics_listener.rb,
lib/generators/deimos/db_poller_generator.rb,
lib/deimos/schema_backends/avro_validation.rb,
lib/deimos/metrics/minimal_datadog_listener.rb,
lib/generators/deimos/schema_class_generator.rb,
lib/deimos/active_record_consume/batch_record.rb,
lib/deimos/active_record_consume/batch_slicer.rb,
lib/deimos/active_record_consume/mass_updater.rb,
lib/generators/deimos/active_record_generator.rb,
lib/deimos/schema_backends/avro_schema_coercer.rb,
lib/generators/deimos/bulk_import_id_generator.rb,
lib/generators/deimos/outbox_backend_generator.rb,
lib/deimos/schema_backends/avro_schema_registry.rb,
lib/deimos/schema_backends/proto_schema_registry.rb,
lib/deimos/active_record_consume/batch_consumption.rb,
lib/deimos/active_record_consume/batch_record_list.rb,
lib/deimos/active_record_consume/message_consumption.rb,
lib/deimos/active_record_consume/schema_model_converter.rb
Overview
Generates a migration for bulk import ID in consumer.
Defined Under Namespace
Modules: ActiveRecordConsume, Backends, Consume, Generators, KafkaSource, Logging, Metrics, ProducerMiddleware, SchemaBackends, SchemaClass, SharedConfig, TestHelpers, Tracing, Utils
Classes: ActiveRecordConsumer, ActiveRecordProducer, AvroSchemaCoercer, Consumer, ConsumerRoute, KafkaMessage, KafkaTopicInfo, Message, MissingImplementationError, PollInfo, Producer, ProducerMetricsListener, ProducerRoute, Railtie, RegistryInfo, SchemaField, SchemaRoute, Transcoder
Constant Summary
collapse
- EVENT_TYPES =
%w(
deimos.ar_consumer.consume_batch
deimos.encode_message
deimos.batch_consumption.invalid_records
deimos.batch_consumption.valid_records
deimos.outbox.produce
).freeze
- VERSION =
'2.5.3'
Class Attribute Summary collapse
Class Method Summary
collapse
-
.decode(schema:, namespace:, payload:) ⇒ Hash?
-
.decode_message(message) ⇒ Object
-
.disable_producers(*producer_classes, &block) ⇒ void
Run a block without allowing any messages to be produced to Kafka.
-
.encode(schema:, namespace:, payload:, subject: nil) ⇒ String
-
.generate_key_schemas ⇒ Object
-
.karafka_config_for(topic: nil, producer: nil) ⇒ Karafka::Routing::Topic?
-
.karafka_configs ⇒ Array<Karafka::Routing::Topic]
Array<Karafka::Routing::Topic].
-
.load_generated_schema_classes ⇒ void
-
.producer_for(topic) ⇒ Object
-
.producers_disabled?(producer_class = nil) ⇒ Boolean
Are producers disabled? If a class is passed in, check only that class.
-
.schema_backend(schema:, namespace:, registry_info: nil, backend: Deimos.config.schema.backend) ⇒ Deimos::SchemaBackends::Base
-
.schema_backend_class(backend: nil) ⇒ Class<Deimos::SchemaBackends::Base>
-
.schema_backend_for(topic_name) ⇒ SchemaBackends::Base
-
.setup_karafka ⇒ Object
-
.setup_producers ⇒ Object
-
.start_outbox_backend!(thread_count: 1) ⇒ void
Start the DB producers to send Kafka messages.
-
.topic_for_consumer(handler_class) ⇒ String?
-
.waterdrop_producers ⇒ Array<::WaterDrop::Producer>
Class Attribute Details
.mock_backends ⇒ Boolean
Returns for use in unit tests.
58
59
60
|
# File 'lib/deimos.rb', line 58
def mock_backends
@mock_backends
end
|
Class Method Details
.decode(schema:, namespace:, payload:) ⇒ Hash?
140
141
142
|
# File 'lib/deimos.rb', line 140
def decode(schema:, namespace:, payload:)
self.schema_backend(schema: schema, namespace: namespace).decode(payload)
end
|
.decode_message(message) ⇒ Object
145
146
147
148
149
150
151
152
153
154
155
156
157
|
# File 'lib/deimos.rb', line 145
def decode_message(message)
topic = message[:topic]
if Deimos.config.producers.topic_prefix
topic = topic.sub(Deimos.config.producers.topic_prefix, '')
end
config = karafka_config_for(topic: topic)
return message unless config
message[:payload] = config.deserializers[:payload].decode_message_hash(message[:payload])
if message[:key] && config.deserializers[:key].respond_to?(:decode_message_hash)
message[:key] = config.deserializers[:key].decode_message_hash(message[:key])
end
end
|
.disable_producers(*producer_classes, &block) ⇒ void
This method returns an undefined value.
Run a block without allowing any messages to be produced to Kafka. Optionally add a list of producer classes to limit the disabling to those classes.
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
# File 'lib/deimos/producer.rb', line 16
def disable_producers(*producer_classes, &block)
if producer_classes.any?
_disable_producer_classes(producer_classes, &block)
return
end
if Thread.current[:frk_disable_all_producers] yield
return
end
begin
Thread.current[:frk_disable_all_producers] = true
yield
ensure
Thread.current[:frk_disable_all_producers] = false
end
end
|
.encode(schema:, namespace:, payload:, subject: nil) ⇒ String
131
132
133
134
|
# File 'lib/deimos.rb', line 131
def encode(schema:, namespace:, payload:, subject: nil)
self.schema_backend(schema: schema, namespace: namespace).
encode(payload, topic: subject || "#{namespace}.#{schema}" )
end
|
.generate_key_schemas ⇒ Object
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
# File 'lib/deimos/config/configuration.rb', line 27
def generate_key_schemas
Deimos.karafka_configs.each do |config|
transcoder = config.deserializers[:key]
next unless transcoder.respond_to?(:key_field) &&
transcoder.key_field &&
transcoder.backend.supports_key_schemas?
transcoder.backend = Deimos.schema_backend(schema: config.schema,
namespace: config.namespace,
backend: transcoder.backend_type)
transcoder.backend.generate_key_schema(transcoder.key_field)
end
end
|
.karafka_config_for(topic: nil, producer: nil) ⇒ Karafka::Routing::Topic?
229
230
231
232
233
234
235
|
# File 'lib/deimos.rb', line 229
def karafka_config_for(topic: nil, producer: nil)
if topic
karafka_configs.find { |t| t.name == topic }
elsif producer
karafka_configs.find { |t| t.producer_classes&.include?(producer) }
end
end
|
.karafka_configs ⇒ Array<Karafka::Routing::Topic]
Returns Array<Karafka::Routing::Topic].
223
224
225
|
# File 'lib/deimos.rb', line 223
def karafka_configs
Karafka::App.routes.flat_map(&:topics).flat_map(&:to_a)
end
|
.load_generated_schema_classes ⇒ void
This method returns an undefined value.
Loads generated classes
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/deimos/config/configuration.rb', line 44
def load_generated_schema_classes
if Deimos.config.schema.generated_class_path.nil?
raise 'Cannot use schema classes without schema.generated_class_path. ' \
'Please provide a directory.'
end
Dir["./#{Deimos.config.schema.generated_class_path}/**/*.rb"].
each { |f| require f }
rescue LoadError
raise 'Cannot load schema classes. Please regenerate classes with' \
'rake deimos:generate_schema_models.'
end
|
.producer_for(topic) ⇒ Object
243
244
245
|
# File 'lib/deimos.rb', line 243
def producer_for(topic)
@producers[topic] || Karafka.producer
end
|
.producers_disabled?(producer_class = nil) ⇒ Boolean
Are producers disabled? If a class is passed in, check only that class. Otherwise check if the global disable flag is set.
52
53
54
55
56
57
|
# File 'lib/deimos/producer.rb', line 52
def producers_disabled?(producer_class=nil)
return true if Deimos.config.producers.disabled
Thread.current[:frk_disable_all_producers] ||
Thread.current[:frk_disabled_producers]&.include?(producer_class)
end
|
.schema_backend(schema:, namespace:, registry_info: nil, backend: Deimos.config.schema.backend) ⇒ Deimos::SchemaBackends::Base
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/deimos.rb', line 98
def schema_backend(schema:, namespace:,
registry_info: nil,
backend: Deimos.config.schema.backend)
if config.schema.use_schema_classes
schema_class = Utils::SchemaClass.klass(schema, namespace)
if schema_class.nil?
schema_backend_class(backend: backend).
new(
schema: schema,
namespace: namespace,
registry_info: registry_info
)
else
schema_instance = schema_class.allocate
schema_backend_class(backend: backend).
new(schema: schema_instance.schema,
namespace: schema_instance.namespace,
registry_info: registry_info)
end
else
schema_backend_class(backend: backend).
new(schema: schema, namespace: namespace, registry_info: registry_info)
end
end
|
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/deimos.rb', line 62
def schema_backend_class(backend: nil)
backend ||= Deimos.config.schema.backend
require "deimos/schema_backends/#{backend}"
klass = "Deimos::SchemaBackends::#{backend.to_s.classify}".constantize
if self.mock_backends
require "deimos/schema_backends/#{klass.mock_backend}"
klass = "Deimos::SchemaBackends::#{klass.mock_backend.to_s.classify}".constantize
end
klass
end
|
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
# File 'lib/deimos.rb', line 77
def schema_backend_for(topic_name)
config = Deimos.karafka_config_for(topic: topic_name)
registry_info = if config.registry_url
Deimos::RegistryInfo.new(
config.registry_url,
config.registry_user,
config.registry_password
)
else
nil
end
self.schema_backend(schema: config.schema,
namespace: config.namespace,
registry_info: registry_info,
backend: config.schema_backend || Deimos.config.schema.backend)
end
|
.setup_karafka ⇒ Object
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
|
# File 'lib/deimos.rb', line 200
def setup_karafka
setup_producers
waterdrop_producers.each do |producer|
producer.middleware.append(Deimos::ProducerMiddleware)
producer.monitor.subscribe(ProducerMetricsListener.new)
producer.monitor.subscribe('error.occurred') do |event|
if event.payload.key?(:messages)
topic = event[:messages].first[:topic]
config = Deimos.karafka_config_for(topic: topic)
message = Deimos::Logging.messages_log_text(config&.payload_log, event[:messages])
json = begin
message.to_json
rescue StandardError
message.to_s
end
Karafka.logger.error("Error producing messages: #{event[:error].message} #{json}")
end
end
end
EVENT_TYPES.each { |type| Karafka.monitor.notifications_bus.register_event(type) }
end
|
.setup_producers ⇒ Object
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
|
# File 'lib/deimos.rb', line 183
def setup_producers
@producers = {}
producers_by_broker = {}
Deimos.karafka_configs.each do |topic|
broker = topic.kafka[:'bootstrap.servers']
producers_by_broker[broker] ||= ::WaterDrop::Producer.new do |p_config|
config_hash = Karafka::Setup::Config.config.kafka.merge(topic.kafka)
p_config.kafka = Karafka::Setup::AttributesMap.producer(config_hash)
end
@producers[topic.name] = producers_by_broker[broker]
end
Karafka.producer.config.kafka =
Karafka::Setup::AttributesMap.producer(Karafka::Setup::Config.config.kafka.dup)
end
|
.start_outbox_backend!(thread_count: 1) ⇒ void
This method returns an undefined value.
Start the DB producers to send Kafka messages.
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
# File 'lib/deimos.rb', line 162
def start_outbox_backend!(thread_count: 1)
Sigurd.exit_on_signal = true
if self.config.producers.backend != :outbox
raise('Publish backend is not set to :outbox, exiting')
end
if thread_count.nil? || thread_count.zero?
raise('Thread count is not given or set to zero, exiting')
end
producers = (1..thread_count).map do
Deimos::Utils::OutboxProducer.
new(self.config.outbox.logger || Karafka.logger)
end
executor = Sigurd::Executor.new(producers,
sleep_seconds: 5,
logger: Karafka.logger)
signal_handler = Sigurd::SignalHandler.new(executor)
signal_handler.run!
end
|
.topic_for_consumer(handler_class) ⇒ String?
248
249
250
251
252
253
254
255
|
# File 'lib/deimos.rb', line 248
def topic_for_consumer(handler_class)
Deimos.karafka_configs.each do |topic|
if topic.consumer == handler_class
return topic.name
end
end
nil
end
|
.waterdrop_producers ⇒ Array<::WaterDrop::Producer>
238
239
240
|
# File 'lib/deimos.rb', line 238
def waterdrop_producers
(@producers.values + [Karafka.producer]).uniq
end
|