Class: Deimos::SchemaBackends::AvroBase

Inherits:
Base
  • Object
show all
Defined in:
lib/deimos/schema_backends/avro_base.rb

Overview

Encode / decode using Avro, either locally or via schema registry.

Direct Known Subclasses

AvroLocal, AvroSchemaRegistry, AvroValidation

Instance Attribute Summary collapse

Attributes inherited from Base

#key_schema, #namespace, #schema

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#coerce, #decode, #decode_payload, #encode, #encode_payload

Constructor Details

#initialize(schema:, namespace:) ⇒ AvroBase

Returns a new instance of AvroBase.



16
17
18
19
# File 'lib/deimos/schema_backends/avro_base.rb', line 16

def initialize(schema:, namespace:)
  super(schema: schema, namespace: namespace)
  @schema_store = AvroTurf::MutableSchemaStore.new(path: Deimos.config.schema.path)
end

Instance Attribute Details

#schema_storeObject

Returns the value of attribute schema_store.



13
14
15
# File 'lib/deimos/schema_backends/avro_base.rb', line 13

def schema_store
  @schema_store
end

Class Method Details

.content_typeObject



96
97
98
# File 'lib/deimos/schema_backends/avro_base.rb', line 96

def self.content_type
  'avro/binary'
end

.field_type(avro_schema) ⇒ String

Converts Avro::Schema::NamedSchema’s to String form for generated YARD docs. Recursively handles the typing for Arrays, Maps and Unions.

Parameters:

  • avro_schema (Avro::Schema::NamedSchema)

Returns:

  • (String)

    A string representation of the Type of this SchemaField



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/deimos/schema_backends/avro_base.rb', line 110

def self.field_type(avro_schema)
  case avro_schema.type_sym
  when :string, :boolean
    avro_schema.type_sym.to_s.titleize
  when :int, :long
    'Integer'
  when :float, :double
    'Float'
  when :record, :enum
    schema_classname(avro_schema)
  when :array
    arr_t = field_type(Deimos::SchemaField.new('n/a', avro_schema.items).type)
    "Array<#{arr_t}>"
  when :map
    map_t = field_type(Deimos::SchemaField.new('n/a', avro_schema.values).type)
    "Hash<String, #{map_t}>"
  when :union
    types = avro_schema.schemas.map do |t|
      field_type(Deimos::SchemaField.new('n/a', t).type)
    end
    types.join(', ')
  when :null
    'nil'
  end
end

.mock_backendObject



91
92
93
# File 'lib/deimos/schema_backends/avro_base.rb', line 91

def self.mock_backend
  :avro_local
end

.schema_base_class(schema) ⇒ Avro::Schema::NamedSchema

Returns the base type of this schema. Decodes Arrays, Maps and Unions

Parameters:

  • schema (Avro::Schema::NamedSchema)

Returns:

  • (Avro::Schema::NamedSchema)


139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/deimos/schema_backends/avro_base.rb', line 139

def self.schema_base_class(schema)
  case schema.type_sym
  when :array
    schema_base_class(schema.items)
  when :map
    schema_base_class(schema.values)
  when :union
    schema.schemas.map(&method(:schema_base_class)).
      reject { |s| s.type_sym == :null }.first
  else
    schema
  end
end

.schema_classname(schema) ⇒ String

Parameters:

  • schema (Avro::Schema::NamedSchema)

    A named schema

Returns:

  • (String)


102
103
104
# File 'lib/deimos/schema_backends/avro_base.rb', line 102

def self.schema_classname(schema)
  schema.name.underscore.camelize.singularize
end

Instance Method Details

#coerce_field(field, value) ⇒ Object



65
66
67
# File 'lib/deimos/schema_backends/avro_base.rb', line 65

def coerce_field(field, value)
  AvroSchemaCoercer.new(avro_schema).coerce_type(field.type, value)
end

#decode_key(payload, key_id) ⇒ Object



34
35
36
37
38
# File 'lib/deimos/schema_backends/avro_base.rb', line 34

def decode_key(payload, key_id)
  @key_schema ||= generate_key_schema(key_id)
  field_name = _field_name_from_schema(@key_schema)
  decode(payload, schema: @key_schema['name'])[field_name]
end

#encode_key(key_id, key, topic: nil) ⇒ Object



22
23
24
25
26
27
28
29
30
31
# File 'lib/deimos/schema_backends/avro_base.rb', line 22

def encode_key(key_id, key, topic: nil)
  begin
    @key_schema ||= @schema_store.find("#{@schema}_key")
  rescue AvroTurf::SchemaNotFoundError
    @key_schema = generate_key_schema(key_id)
  end
  field_name = _field_name_from_schema(@key_schema)
  payload = key.is_a?(Hash) ? key : { field_name => key }
  encode(payload, schema: @key_schema['name'], topic: topic)
end

#generate_key_schema(field_name) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/deimos/schema_backends/avro_base.rb', line 153

def generate_key_schema(field_name)
  key_field = avro_schema.fields.find { |f| f.name == field_name.to_s }
  name = _key_schema_name(@schema)
  key_schema = {
    'type' => 'record',
    'name' => name,
    'namespace' => @namespace,
    'doc' => "Key for #{@namespace}.#{@schema} - autogenerated by Deimos",
    'fields' => [
      {
        'name' => field_name,
        'type' => key_field.type.type_sym.to_s
      }
    ]
  }
  @schema_store.add_schema(key_schema)
  @key_schema = key_schema
end

#load_schemaAvro::Schema

Returns:

  • (Avro::Schema)


86
87
88
# File 'lib/deimos/schema_backends/avro_base.rb', line 86

def load_schema
  avro_schema
end

#schema_fieldsObject



70
71
72
73
74
75
# File 'lib/deimos/schema_backends/avro_base.rb', line 70

def schema_fields
  avro_schema.fields.map do |field|
    enum_values = field.type.type == 'enum' ? field.type.symbols : []
    SchemaField.new(field.name, field.type, enum_values, field.default)
  end
end

#sql_type(field) ⇒ Object

:nodoc:



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/deimos/schema_backends/avro_base.rb', line 41

def sql_type(field)
  type = field.type.type
  return type if %w(array map record).include?(type)

  if type == :union
    non_null = field.type.schemas.reject { |f| f.type == :null }
    if non_null.size > 1
      warn("WARNING: #{field.name} has more than one non-null type. Picking the first for the SQL type.")
    end
    return non_null.first.type
  end
  return type.to_sym if %w(float boolean).include?(type)
  return :integer if type == 'int'
  return :bigint if type == 'long'

  if type == 'double'
    warn('Avro `double` type turns into SQL `float` type. Please ensure you have the correct `limit` set.')
    return :float
  end

  :string
end

#validate(payload, schema:) ⇒ Object



78
79
80
81
82
# File 'lib/deimos/schema_backends/avro_base.rb', line 78

def validate(payload, schema:)
  Avro::SchemaValidator.validate!(avro_schema(schema), payload,
                                  recursive: true,
                                  fail_on_extra_fields: true)
end