Class: SchemaRegistry::AvroSchemaStore
- Inherits:
-
Object
- Object
- SchemaRegistry::AvroSchemaStore
- Defined in:
- lib/schema_registry_client/avro_schema_store.rb
Instance Attribute Summary collapse
-
#schemas ⇒ Object
Returns the value of attribute schemas.
Instance Method Summary collapse
- #add_schema(schema_hash) ⇒ Object
-
#find(name) ⇒ Object
Resolves and returns a schema.
-
#initialize(path: nil) ⇒ AvroSchemaStore
constructor
A new instance of AvroSchemaStore.
-
#load_schemas! ⇒ Object
Loads all schema definition files in the ‘schemas_dir`.
Constructor Details
#initialize(path: nil) ⇒ AvroSchemaStore
Returns a new instance of AvroSchemaStore.
7 8 9 10 11 |
# File 'lib/schema_registry_client/avro_schema_store.rb', line 7 def initialize(path: nil) @path = path or raise "Please specify a schema path" @schemas = {} @mutex = Mutex.new end |
Instance Attribute Details
#schemas ⇒ Object
Returns the value of attribute schemas.
13 14 15 |
# File 'lib/schema_registry_client/avro_schema_store.rb', line 13 def schemas @schemas end |
Instance Method Details
#add_schema(schema_hash) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/schema_registry_client/avro_schema_store.rb', line 51 def add_schema(schema_hash) name = schema_hash["name"] namespace = schema_hash["namespace"] full_name = Avro::Name.make_fullname(name, namespace) return if @schemas.key?(full_name) # We pass in copy of @schemas which Avro can freely modify # and register the sub-schema. It doesn't matter because # we will discard it. schema = Avro::Schema.real_parse(schema_hash, @schemas.dup) @schemas[full_name] = schema schema end |
#find(name) ⇒ Object
Resolves and returns a schema.
schema_name - The String name of the schema to resolve.
Returns an Avro::Schema.
20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/schema_registry_client/avro_schema_store.rb', line 20 def find(name) # Optimistic non-blocking read from @schemas # No sense to lock the resource when all the schemas already loaded return @schemas[name] if @schemas.key?(name) # Pessimistic blocking write to @schemas @mutex.synchronize do # Still need to check is the schema already loaded return @schemas[name] if @schemas.key?(name) load_schema!(name, @schemas.dup) end end |
#load_schemas! ⇒ Object
Loads all schema definition files in the ‘schemas_dir`.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/schema_registry_client/avro_schema_store.rb', line 35 def load_schemas! pattern = [@path, "**", "*.avsc"].join("/") Dir.glob(pattern) do |schema_path| # Remove the path prefix. schema_path.sub!(%r{^/?#{@path}/}, "") # Replace `/` with `.` and chop off the file extension. schema_name = File.basename(schema_path.tr("/", "."), ".avsc") # Load and cache the schema. find(schema_name) end end |