Class: Deimos::SchemaBackends::AvroBase

Inherits:
Base
  • Object
show all
Defined in:
lib/deimos/schema_backends/avro_base.rb,
sig/defs.rbs

Overview

Encode / decode using Avro, either locally or via schema registry.

Direct Known Subclasses

AvroLocal, AvroSchemaRegistry, AvroValidation

Instance Attribute Summary collapse

Attributes inherited from Base

#key_schema, #namespace, #registry_info, #schema

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#coerce, #decode, #decode_payload, #encode, #encode_payload, #inspect

Constructor Details

#initialize(schema:, namespace:, registry_info: nil) ⇒ AvroBase

Returns a new instance of AvroBase.

Parameters:

  • schema: (String, Symbol)
  • namespace: (String)


15
16
17
18
# File 'lib/deimos/schema_backends/avro_base.rb', line 15

def initialize(schema:, namespace:, registry_info: nil)
  super
  @schema_store = SchemaRegistry::AvroSchemaStore.new(path: Deimos.config.schema.path)
end

Instance Attribute Details

#schema_storeObject

Returns the value of attribute schema_store.

Returns:

  • (Object)


1432
1433
1434
# File 'sig/defs.rbs', line 1432

def schema_store
  @schema_store
end

Class Method Details

.content_typeString

Returns:

  • (String)


104
105
106
# File 'lib/deimos/schema_backends/avro_base.rb', line 104

def self.content_type
  'avro/binary'
end

.field_type(avro_schema) ⇒ String

Converts Avro::Schema::NamedSchema's to String form for generated YARD docs. Recursively handles the typing for Arrays, Maps and Unions.

@param avro_schema

@return — A string representation of the Type of this SchemaField

Parameters:

Returns:

  • (String)


119
120
121
# File 'lib/deimos/schema_backends/avro_base.rb', line 119

def self.field_type(avro_schema)
  AvroGen::AvroParser.field_type(avro_schema)
end

.mock_backendSymbol

Returns:

  • (Symbol)


99
100
101
# File 'lib/deimos/schema_backends/avro_base.rb', line 99

def self.mock_backend
  :avro_validation
end

.schema_base_class(schema) ⇒ Avro::Schema::NamedSchema

Returns the base type of this schema. Decodes Arrays, Maps and Unions

@param schema

Parameters:

Returns:



127
128
129
# File 'lib/deimos/schema_backends/avro_base.rb', line 127

def self.schema_base_class(schema)
  AvroGen::AvroParser.schema_base_class(schema)
end

.schema_classname(schema) ⇒ String

@param schema — A named schema

Parameters:

Returns:

  • (String)


111
112
113
# File 'lib/deimos/schema_backends/avro_base.rb', line 111

def self.schema_classname(schema)
  AvroGen::AvroParser.schema_classname(schema)
end

Instance Method Details

#coerce_field(field, value) ⇒ Object

Parameters:

Returns:

  • (Object)


73
74
75
# File 'lib/deimos/schema_backends/avro_base.rb', line 73

def coerce_field(field, value)
  AvroSchemaCoercer.new(avro_schema).coerce_type(field.type, value)
end

#decode_key(payload, key_id) ⇒ String

Parameters:

  • payload (::Hash[untyped, untyped])
  • key_id (String, Symbol)

Returns:

  • (String)


42
43
44
45
46
# File 'lib/deimos/schema_backends/avro_base.rb', line 42

def decode_key(payload, key_id)
  @key_schema ||= generate_key_schema(key_id)
  field_name = _field_name_from_schema(@key_schema)
  decode(payload, schema: @key_schema['name'])[field_name]
end

#encode_key(key_id, key, topic: nil) ⇒ String

Parameters:

  • key_id (String, Symbol)
  • key (String, ::Hash[untyped, untyped])
  • topic: (String, nil) (defaults to: nil)

Returns:

  • (String)


30
31
32
33
34
35
36
37
38
39
# File 'lib/deimos/schema_backends/avro_base.rb', line 30

def encode_key(key_id, key, topic: nil)
  begin
    @key_schema ||= @schema_store.find("#{@schema}_key")
  rescue SchemaRegistry::SchemaNotFoundError
    @key_schema = generate_key_schema(key_id)
  end
  field_name = _field_name_from_schema(@key_schema)
  payload = key.is_a?(Hash) ? key : { field_name => key }
  encode(payload, schema: @key_schema['name'], topic: topic, is_key: true)
end

#generate_key_schema(field_name) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/deimos/schema_backends/avro_base.rb', line 131

def generate_key_schema(field_name)
  key_field = avro_schema.fields.find { |f| f.name == field_name.to_s }
  name = _key_schema_name(@schema)
  key_schema = {
    'type' => 'record',
    'name' => name,
    'namespace' => @namespace,
    'doc' => "Key for #{@namespace}.#{@schema} - autogenerated by Deimos",
    'fields' => [
      {
        'name' => field_name,
        'type' => key_field.type.type_sym.to_s
      }
    ]
  }
  @schema_store.add_schema(key_schema)
  @key_schema = key_schema
end

#load_schemaAvro::Schema

Returns:



94
95
96
# File 'lib/deimos/schema_backends/avro_base.rb', line 94

def load_schema
  avro_schema
end

#schema_fields::Array[SchemaField]

Returns:



78
79
80
81
82
83
# File 'lib/deimos/schema_backends/avro_base.rb', line 78

def schema_fields
  avro_schema.fields.map do |field|
    enum_values = field.type.type == 'enum' ? field.type.symbols : []
    SchemaField.new(field.name, field.type, enum_values, field.default)
  end
end

#sql_type(field) ⇒ Symbol

:nodoc:

Parameters:

Returns:

  • (Symbol)


49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/deimos/schema_backends/avro_base.rb', line 49

def sql_type(field)
  type = field.type.type
  return type if %w(array map record).include?(type)

  if type == :union
    non_null = field.type.schemas.reject { |f| f.type == :null }
    if non_null.size > 1
      warn("WARNING: #{field.name} has more than one non-null type. Picking the first for the SQL type.")
    end
    return non_null.first.type
  end
  return type.to_sym if %w(float boolean).include?(type)
  return :integer if type == 'int'
  return :bigint if type == 'long'

  if type == 'double'
    warn('Avro `double` type turns into SQL `float` type. Please ensure you have the correct `limit` set.')
    return :float
  end

  :string
end

#supports_class_generation?Boolean

Returns:

  • (Boolean)


25
26
27
# File 'lib/deimos/schema_backends/avro_base.rb', line 25

def supports_class_generation?
  true
end

#supports_key_schemas?Boolean

Returns:

  • (Boolean)


20
21
22
# File 'lib/deimos/schema_backends/avro_base.rb', line 20

def supports_key_schemas?
  true
end

#validate(payload, schema:) ⇒ void

This method returns an undefined value.

Parameters:

  • payload (::Hash[untyped, untyped])
  • schema: (String, Symbol)


86
87
88
89
90
# File 'lib/deimos/schema_backends/avro_base.rb', line 86

def validate(payload, schema:)
  Avro::SchemaValidator.validate!(avro_schema(schema), payload,
                                  recursive: true,
                                  fail_on_extra_fields: true)
end