Module: Deimos

Includes:
FigTree
Defined in:
lib/deimos.rb,
lib/deimos/logging.rb,
lib/deimos/message.rb,
lib/deimos/version.rb,
lib/deimos/consumer.rb,
lib/deimos/producer.rb,
lib/deimos/poll_info.rb,
lib/deimos/exceptions.rb,
lib/deimos/transcoder.rb,
lib/deimos/kafka_source.rb,
lib/deimos/metrics/mock.rb,
lib/deimos/test_helpers.rb,
lib/deimos/tracing/mock.rb,
lib/deimos/backends/base.rb,
lib/deimos/backends/test.rb,
lib/deimos/kafka_message.rb,
lib/deimos/shared_config.rb,
lib/deimos/backends/kafka.rb,
lib/deimos/backends/outbox.rb,
lib/deimos/metrics/datadog.rb,
lib/deimos/tracing/datadog.rb,
lib/deimos/utils/db_poller.rb,
lib/deimos/ext/schema_route.rb,
lib/deimos/kafka_topic_info.rb,
lib/deimos/metrics/provider.rb,
lib/deimos/tracing/provider.rb,
lib/deimos/schema_class/base.rb,
lib/deimos/schema_class/enum.rb,
lib/deimos/ext/consumer_route.rb,
lib/deimos/ext/producer_route.rb,
lib/deimos/utils/schema_class.rb,
lib/deimos/schema_class/record.rb,
lib/deimos/backends/kafka_async.rb,
lib/deimos/config/configuration.rb,
lib/deimos/schema_backends/base.rb,
lib/deimos/schema_backends/mock.rb,
lib/deimos/utils/db_poller/base.rb,
lib/deimos/utils/deadlock_retry.rb,
lib/deimos/schema_backends/plain.rb,
lib/deimos/utils/outbox_producer.rb,
lib/deimos/active_record_consumer.rb,
lib/deimos/active_record_producer.rb,
lib/deimos/ext/producer_middleware.rb,
lib/generators/deimos/v2_generator.rb,
lib/deimos/consume/batch_consumption.rb,
lib/deimos/schema_backends/avro_base.rb,
lib/deimos/schema_backends/avro_local.rb,
lib/deimos/utils/db_poller/time_based.rb,
lib/deimos/consume/message_consumption.rb,
lib/deimos/utils/db_poller/state_based.rb,
lib/generators/deimos/db_poller_generator.rb,
lib/deimos/schema_backends/avro_validation.rb,
lib/generators/deimos/schema_class_generator.rb,
lib/deimos/active_record_consume/batch_record.rb,
lib/deimos/active_record_consume/batch_slicer.rb,
lib/deimos/active_record_consume/mass_updater.rb,
lib/generators/deimos/active_record_generator.rb,
lib/deimos/schema_backends/avro_schema_coercer.rb,
lib/generators/deimos/bulk_import_id_generator.rb,
lib/generators/deimos/outbox_backend_generator.rb,
lib/deimos/schema_backends/avro_schema_registry.rb,
lib/deimos/active_record_consume/batch_consumption.rb,
lib/deimos/active_record_consume/batch_record_list.rb,
lib/deimos/active_record_consume/message_consumption.rb,
lib/deimos/active_record_consume/schema_model_converter.rb

Overview

Generates a migration for bulk import ID in consumer.

Defined Under Namespace

Modules: ActiveRecordConsume, Backends, Consume, Generators, KafkaSource, Logging, Metrics, ProducerMiddleware, SchemaBackends, SchemaClass, SharedConfig, TestHelpers, Tracing, Utils Classes: ActiveRecordConsumer, ActiveRecordProducer, AvroSchemaCoercer, Consumer, ConsumerRoute, KafkaMessage, KafkaTopicInfo, Message, MissingImplementationError, PollInfo, Producer, ProducerRoute, Railtie, SchemaField, SchemaRoute, Transcoder

Constant Summary collapse

EVENT_TYPES =
%w(
  deimos.ar_consumer.consume_batch
  deimos.encode_message
  deimos.batch_consumption.invalid_records
  deimos.batch_consumption.valid_records
  deimos.outbox.produce
)
VERSION =
'2.0.0-alpha2'

Class Method Summary collapse

Class Method Details

.decode(schema:, namespace:, payload:) ⇒ Hash?

Parameters:

  • schema (String)
  • namespace (String)
  • payload (String)

Returns:

  • (Hash, nil)


99
100
101
# File 'lib/deimos.rb', line 99

def decode(schema:, namespace:, payload:)
  self.schema_backend(schema: schema, namespace: namespace).decode(payload)
end

.decode_message(message) ⇒ Object

Parameters:

  • message (Hash)

    a Karafka message with keys :payload, :key and :topic



104
105
106
107
108
109
110
111
112
113
114
# File 'lib/deimos.rb', line 104

def decode_message(message)
  topic = message[:topic]
  if Deimos.config.producers.topic_prefix
    topic = topic.sub(Deimos.config.producers.topic_prefix, '')
  end
  config = karafka_config_for(topic: topic)
  message[:payload] = config.deserializers[:payload].decode_message_hash(message[:payload])
  if message[:key] && config.deserializers[:key].respond_to?(:decode_message_hash)
    message[:key] = config.deserializers[:key].decode_message_hash(message[:key])
  end
end

.disable_producers(*producer_classes, &block) ⇒ void

This method returns an undefined value.

Run a block without allowing any messages to be produced to Kafka. Optionally add a list of producer classes to limit the disabling to those classes.

Parameters:

  • producer_classes (Array<Class>, Class)


16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/deimos/producer.rb', line 16

def disable_producers(*producer_classes, &block)
  if producer_classes.any?
    _disable_producer_classes(producer_classes, &block)
    return
  end

  if Thread.current[:frk_disable_all_producers] # nested disable block
    yield
    return
  end

  begin
    Thread.current[:frk_disable_all_producers] = true
    yield
  ensure
    Thread.current[:frk_disable_all_producers] = false
  end
end

.encode(schema:, namespace:, payload:, subject: nil) ⇒ String

Parameters:

  • schema (String)
  • namespace (String)
  • payload (Hash)
  • subject (String) (defaults to: nil)

Returns:

  • (String)


90
91
92
93
# File 'lib/deimos.rb', line 90

def encode(schema:, namespace:, payload:, subject: nil)
  self.schema_backend(schema: schema, namespace: namespace).
    encode(payload, topic: subject || "#{namespace}.#{schema}" )
end

.generate_key_schemasObject



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/deimos/config/configuration.rb', line 23

def generate_key_schemas
  Deimos.karafka_configs.each do |config|
    transcoder = config.deserializers[:key]

    if transcoder.respond_to?(:key_field) && transcoder.key_field
      transcoder.backend = Deimos.schema_backend(schema: config.schema,
                                                 namespace: config.namespace)
      transcoder.backend.generate_key_schema(transcoder.key_field)
    end
  end
end

.karafka_config_for(topic: nil, producer: nil) ⇒ Karafka::Routing::Topic?

Parameters:

  • topic (String) (defaults to: nil)

Returns:

  • (Karafka::Routing::Topic, nil)


161
162
163
164
165
166
167
# File 'lib/deimos.rb', line 161

def karafka_config_for(topic: nil, producer: nil)
  if topic
    karafka_configs.find { |t| t.name == topic}
  elsif producer
    karafka_configs.find { |t| t.producer_class == producer}
  end
end

.karafka_configsArray<Karafka::Routing::Topic]

Returns Array<Karafka::Routing::Topic].

Returns:

  • (Array<Karafka::Routing::Topic])

    Array<Karafka::Routing::Topic]



155
156
157
# File 'lib/deimos.rb', line 155

def karafka_configs
  Karafka::App.routes.flat_map(&:topics).flat_map(&:to_a)
end

.load_generated_schema_classesvoid

This method returns an undefined value.

Loads generated classes



37
38
39
40
41
42
43
44
45
# File 'lib/deimos/config/configuration.rb', line 37

def load_generated_schema_classes
  if Deimos.config.schema.generated_class_path.nil?
    raise 'Cannot use schema classes without schema.generated_class_path. Please provide a directory.'
  end

  Dir["./#{Deimos.config.schema.generated_class_path}/**/*.rb"].sort.each { |f| require f }
rescue LoadError
  raise 'Cannot load schema classes. Please regenerate classes with rake deimos:generate_schema_models.'
end

.producers_disabled?(producer_class = nil) ⇒ Boolean

Are producers disabled? If a class is passed in, check only that class. Otherwise check if the global disable flag is set.

Parameters:

  • producer_class (Class) (defaults to: nil)

Returns:

  • (Boolean)


52
53
54
55
56
57
# File 'lib/deimos/producer.rb', line 52

def producers_disabled?(producer_class=nil)
  return true if Deimos.config.producers.disabled

  Thread.current[:frk_disable_all_producers] ||
    Thread.current[:frk_disabled_producers]&.include?(producer_class)
end

.schema_backend(schema:, namespace:) ⇒ Deimos::SchemaBackends::Base

Parameters:

  • schema (String, Symbol)
  • namespace (String)

Returns:



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/deimos.rb', line 68

def schema_backend(schema:, namespace:)
  if config.schema.use_schema_classes
    # Initialize an instance of the provided schema
    # in the event the schema class is an override, the inherited
    # schema and namespace will be applied
    schema_class = Utils::SchemaClass.klass(schema, namespace)
    if schema_class.nil?
      schema_backend_class.new(schema: schema, namespace: namespace)
    else
      schema_instance = schema_class.new
      schema_backend_class.new(schema: schema_instance.schema, namespace: schema_instance.namespace)
    end
  else
    schema_backend_class.new(schema: schema, namespace: namespace)
  end
end

.schema_backend_classClass<Deimos::SchemaBackends::Base>

Returns:



57
58
59
60
61
62
63
# File 'lib/deimos.rb', line 57

def schema_backend_class
  backend = Deimos.config.schema.backend.to_s

  require "deimos/schema_backends/#{backend}"

  "Deimos::SchemaBackends::#{backend.classify}".constantize
end

.setup_karafkaObject



140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/deimos.rb', line 140

def setup_karafka
  Karafka.producer.middleware.append(Deimos::ProducerMiddleware)
  EVENT_TYPES.each { |type| Karafka.monitor.notifications_bus.register_event(type) }

  Karafka.producer.monitor.subscribe('error.occurred') do |event|
    if event.payload.key?(:messages)
      topic = event[:messages].first[:topic]
      config = Deimos.karafka_config_for(topic: topic)
      message = Deimos::Logging.messages_log_text(config&.payload_log, event[:messages])
      Karafka.logger.error("Error producing messages: #{event[:error].message} #{message.to_json}")
    end
  end
end

.start_outbox_backend!(thread_count: 1) ⇒ void

This method returns an undefined value.

Start the DB producers to send Kafka messages.

Parameters:

  • thread_count (Integer) (defaults to: 1)

    the number of threads to start.



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/deimos.rb', line 119

def start_outbox_backend!(thread_count: 1)
  Sigurd.exit_on_signal = true
  if self.config.producers.backend != :outbox
    raise('Publish backend is not set to :outbox, exiting')
  end

  if thread_count.nil? || thread_count.zero?
    raise('Thread count is not given or set to zero, exiting')
  end

  producers = (1..thread_count).map do
    Deimos::Utils::OutboxProducer.
      new(self.config.outbox.logger || Karafka.logger)
  end
  executor = Sigurd::Executor.new(producers,
                                  sleep_seconds: 5,
                                  logger: Karafka.logger)
  signal_handler = Sigurd::SignalHandler.new(executor)
  signal_handler.run!
end

.topic_for_consumer(handler_class) ⇒ String?

Parameters:

  • handler_class (Class)

Returns:

  • (String, nil)


171
172
173
174
175
176
177
178
# File 'lib/deimos.rb', line 171

def topic_for_consumer(handler_class)
  Deimos.karafka_configs.each do |topic|
    if topic.consumer == handler_class
      return topic.name
    end
  end
  nil
end