Class: SchemaRegistry::Schema::Avro

Inherits:
Base
  • Object
show all
Defined in:
lib/schema_registry_client/schema/avro.rb

Constant Summary collapse

DEFAULT_SCHEMAS_PATH =
"./schemas"

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#dependencies

Constructor Details

#initialize(schema_store: nil) ⇒ Avro

Returns a new instance of Avro.

Parameters:



16
17
18
# File 'lib/schema_registry_client/schema/avro.rb', line 16

def initialize(schema_store: nil)
  @schema_store = schema_store
end

Class Method Details

.schema_typeObject



11
12
13
# File 'lib/schema_registry_client/schema/avro.rb', line 11

def self.schema_type
  "AVRO"
end

Instance Method Details

#decode(stream, schema_text) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/schema_registry_client/schema/avro.rb', line 52

def decode(stream, schema_text)
  # Cache parsed writer schemas to avoid re-parsing on every decode
  @parsed_writers_schemas ||= {}
  @parsed_writers_schemas[schema_text] ||= ::Avro::Schema.parse(schema_text)
  writers_schema = @parsed_writers_schemas[schema_text]
  decoder = ::Avro::IO::BinaryDecoder.new(stream)

  # Try to find the reader schema locally, fall back to writer schema
  readers_schema = begin
    schema_store.find(writers_schema.fullname)
  rescue
    writers_schema
  end

  reader = ::Avro::IO::DatumReader.new(writers_schema, readers_schema)
  reader.read(decoder)
end

#encode(message, stream, schema_name: nil) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/schema_registry_client/schema/avro.rb', line 39

def encode(message, stream, schema_name: nil)
  validate_options = {recursive: true,
                      encoded: false,
                      fail_on_extra_fields: true}
  schema = schema_store.find(schema_name)

  ::Avro::SchemaValidator.validate!(schema, message, **validate_options)

  writer = ::Avro::IO::DatumWriter.new(schema)
  encoder = ::Avro::IO::BinaryEncoder.new(stream)
  writer.write(message, encoder)
end

#schema_storeSchemaRegistry::AvroSchemaStore



21
22
23
24
25
26
27
28
29
30
# File 'lib/schema_registry_client/schema/avro.rb', line 21

def schema_store
  @schema_store ||= SchemaRegistry::AvroSchemaStore.new(
    path: SchemaRegistry.avro_schema_path || DEFAULT_SCHEMAS_PATH
  )
  unless @schemas_loaded
    @schema_store.load_schemas!
    @schemas_loaded = true
  end
  @schema_store
end

#schema_text(_message, schema_name: nil) ⇒ Object

Register the fully-resolved (inlined) schema. The raw .avsc text is not a valid standalone schema when it references a type defined in another file.



34
35
36
37
# File 'lib/schema_registry_client/schema/avro.rb', line 34

def schema_text(_message, schema_name: nil)
  @registration_text ||= {}
  @registration_text[schema_name] ||= schema_store.find(schema_name).to_avro.to_json
end