Class: SchemaRegistry::CachedConfluentSchemaRegistry

Inherits:
Object
  • Object
show all
Defined in:
lib/schema_registry_client/cached_confluent_schema_registry.rb

Instance Method Summary collapse

Constructor Details

#initialize(upstream) ⇒ CachedConfluentSchemaRegistry

Returns a new instance of CachedConfluentSchemaRegistry.



6
7
8
9
10
11
12
# File 'lib/schema_registry_client/cached_confluent_schema_registry.rb', line 6

def initialize(upstream)
  @upstream = upstream
  @schemas_by_id = {}
  @ids_by_schema = {}
  @versions_by_subject_and_id = {}
  @mutex = Mutex.new
end

Instance Method Details

#fetch(id) ⇒ String

Returns the schema string stored in the registry for the given id.

Parameters:

  • id (Integer)

    the schema ID to fetch

Returns:

  • (String)

    the schema string stored in the registry for the given id



23
24
25
26
27
# File 'lib/schema_registry_client/cached_confluent_schema_registry.rb', line 23

def fetch(id)
  @mutex.synchronize do
    @schemas_by_id[id] ||= @upstream.fetch(id)
  end
end

#fetch_id(subject, schema) ⇒ Object



51
52
53
54
55
# File 'lib/schema_registry_client/cached_confluent_schema_registry.rb', line 51

def fetch_id(subject, schema)
  @mutex.synchronize do
    @ids_by_schema[[subject, schema]]
  end
end

#fetch_version(id, subject) ⇒ Integer?

Returns the version of the schema for the given subject and id, or nil if not found.

Parameters:

  • id (Integer)

    the schema ID to fetch

  • subject (String)

    the subject to fetch the version for

Returns:

  • (Integer, nil)

    the version of the schema for the given subject and id, or nil if not found



32
33
34
35
36
37
38
39
40
# File 'lib/schema_registry_client/cached_confluent_schema_registry.rb', line 32

def fetch_version(id, subject)
  @mutex.synchronize do
    key = [subject, id]
    return @versions_by_subject_and_id[key] if @versions_by_subject_and_id[key]

    results = @upstream.schema_subject_versions(id)
    @versions_by_subject_and_id[key] = results&.find { |r| r["subject"] == subject }&.dig("version")
  end
end

#register(subject, schema, references: [], schema_type: "PROTOBUF") ⇒ Object

Parameters:

  • subject (String)

    the subject to register the schema under

  • schema (String)

    the schema text to register

  • references (Array<Hash>) (defaults to: [])

    optional references to other schemas

  • schema_type (String) (defaults to: "PROTOBUF")


61
62
63
64
65
66
67
68
69
70
# File 'lib/schema_registry_client/cached_confluent_schema_registry.rb', line 61

def register(subject, schema, references: [], schema_type: "PROTOBUF")
  @mutex.synchronize do
    key = [subject, schema]

    @ids_by_schema[key] ||= @upstream.register(subject,
      schema,
      references: references,
      schema_type: schema_type)
  end
end

#registered?(subject, schema) ⇒ Boolean

Returns true if we know the schema has been registered for that subject.

Parameters:

  • subject (String)

    the subject to check

  • schema (String)

    the schema text to check

Returns:

  • (Boolean)

    true if we know the schema has been registered for that subject.



45
46
47
48
49
# File 'lib/schema_registry_client/cached_confluent_schema_registry.rb', line 45

def registered?(subject, schema)
  @mutex.synchronize do
    @ids_by_schema.key?([subject, schema])
  end
end