Class: SchemaRegistry::Schema::Avro
- Defined in:
- lib/schema_registry_client/schema/avro.rb
Constant Summary collapse
- DEFAULT_SCHEMAS_PATH =
"./schemas"
Class Method Summary collapse
Instance Method Summary collapse
- #decode(stream, schema_text) ⇒ Object
- #encode(message, stream, schema_name: nil) ⇒ Object
-
#initialize(schema_store: nil) ⇒ Avro
constructor
A new instance of Avro.
- #schema_store ⇒ SchemaRegistry::AvroSchemaStore
-
#schema_text(_message, schema_name: nil) ⇒ Object
Register the fully-resolved (inlined) schema.
Methods inherited from Base
Constructor Details
#initialize(schema_store: nil) ⇒ Avro
Returns a new instance of Avro.
16 17 18 |
# File 'lib/schema_registry_client/schema/avro.rb', line 16 def initialize(schema_store: nil) @schema_store = schema_store end |
Class Method Details
.schema_type ⇒ Object
11 12 13 |
# File 'lib/schema_registry_client/schema/avro.rb', line 11 def self.schema_type "AVRO" end |
Instance Method Details
#decode(stream, schema_text) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/schema_registry_client/schema/avro.rb', line 52 def decode(stream, schema_text) # Cache parsed writer schemas to avoid re-parsing on every decode @parsed_writers_schemas ||= {} @parsed_writers_schemas[schema_text] ||= ::Avro::Schema.parse(schema_text) writers_schema = @parsed_writers_schemas[schema_text] decoder = ::Avro::IO::BinaryDecoder.new(stream) # Try to find the reader schema locally, fall back to writer schema readers_schema = begin schema_store.find(writers_schema.fullname) rescue writers_schema end reader = ::Avro::IO::DatumReader.new(writers_schema, readers_schema) reader.read(decoder) end |
#encode(message, stream, schema_name: nil) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/schema_registry_client/schema/avro.rb', line 39 def encode(, stream, schema_name: nil) = {recursive: true, encoded: false, fail_on_extra_fields: true} schema = schema_store.find(schema_name) ::Avro::SchemaValidator.validate!(schema, , **) writer = ::Avro::IO::DatumWriter.new(schema) encoder = ::Avro::IO::BinaryEncoder.new(stream) writer.write(, encoder) end |
#schema_store ⇒ SchemaRegistry::AvroSchemaStore
21 22 23 24 25 26 27 28 29 30 |
# File 'lib/schema_registry_client/schema/avro.rb', line 21 def schema_store @schema_store ||= SchemaRegistry::AvroSchemaStore.new( path: SchemaRegistry.avro_schema_path || DEFAULT_SCHEMAS_PATH ) unless @schemas_loaded @schema_store.load_schemas! @schemas_loaded = true end @schema_store end |
#schema_text(_message, schema_name: nil) ⇒ Object
Register the fully-resolved (inlined) schema. The raw .avsc text is not a valid standalone schema when it references a type defined in another file.
34 35 36 37 |
# File 'lib/schema_registry_client/schema/avro.rb', line 34 def schema_text(, schema_name: nil) @registration_text ||= {} @registration_text[schema_name] ||= schema_store.find(schema_name).to_avro.to_json end |