Class: SchemaRegistry::AvroSchemaStore

Inherits:
Object
  • Object
show all
Defined in:
lib/schema_registry_client/avro_schema_store.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path: nil) ⇒ AvroSchemaStore

Returns a new instance of AvroSchemaStore.



7
8
9
10
11
# File 'lib/schema_registry_client/avro_schema_store.rb', line 7

def initialize(path: nil)
  @path = path or raise "Please specify a schema path"
  @schemas = {}
  @mutex = Mutex.new
end

Instance Attribute Details

#schemasObject

Returns the value of attribute schemas.



13
14
15
# File 'lib/schema_registry_client/avro_schema_store.rb', line 13

def schemas
  @schemas
end

Instance Method Details

#add_schema(schema_hash) ⇒ Object

Parameters:

  • schema_hash (Hash)


51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/schema_registry_client/avro_schema_store.rb', line 51

def add_schema(schema_hash)
  name = schema_hash["name"]
  namespace = schema_hash["namespace"]
  full_name = Avro::Name.make_fullname(name, namespace)
  return if @schemas.key?(full_name)

  # We pass in copy of @schemas which Avro can freely modify
  # and register the sub-schema. It doesn't matter because
  # we will discard it.
  schema = Avro::Schema.real_parse(schema_hash, @schemas.dup)
  @schemas[full_name] = schema

  schema
end

#find(name) ⇒ Object

Resolves and returns a schema.

schema_name - The String name of the schema to resolve.

Returns an Avro::Schema.



20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/schema_registry_client/avro_schema_store.rb', line 20

def find(name)
  # Optimistic non-blocking read from @schemas
  # No sense to lock the resource when all the schemas already loaded
  return @schemas[name] if @schemas.key?(name)

  # Pessimistic blocking write to @schemas
  @mutex.synchronize do
    # Still need to check is the schema already loaded
    return @schemas[name] if @schemas.key?(name)

    load_schema!(name, @schemas.dup)
  end
end

#load_schemas!Object

Loads all schema definition files in the ‘schemas_dir`.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/schema_registry_client/avro_schema_store.rb', line 35

def load_schemas!
  pattern = [@path, "**", "*.avsc"].join("/")

  Dir.glob(pattern) do |schema_path|
    # Remove the path prefix.
    schema_path.sub!(%r{^/?#{@path}/}, "")

    # Replace `/` with `.` and chop off the file extension.
    schema_name = File.basename(schema_path.tr("/", "."), ".avsc")

    # Load and cache the schema.
    find(schema_name)
  end
end