Class: SchemaRegistry::CachedConfluentSchemaRegistry
- Inherits:
-
Object
- Object
- SchemaRegistry::CachedConfluentSchemaRegistry
- Defined in:
- lib/schema_registry_client/cached_confluent_schema_registry.rb
Instance Method Summary collapse
-
#fetch(id) ⇒ String
The schema string stored in the registry for the given id.
- #fetch_id(subject, schema) ⇒ Object
-
#fetch_version(id, subject) ⇒ Integer?
The version of the schema for the given subject and id, or nil if not found.
-
#initialize(upstream) ⇒ CachedConfluentSchemaRegistry
constructor
A new instance of CachedConfluentSchemaRegistry.
- #register(subject, schema, references: [], schema_type: "PROTOBUF") ⇒ Object
-
#registered?(subject, schema) ⇒ Boolean
True if we know the schema has been registered for that subject.
Constructor Details
#initialize(upstream) ⇒ CachedConfluentSchemaRegistry
Returns a new instance of CachedConfluentSchemaRegistry.
6 7 8 9 10 11 12 |
# File 'lib/schema_registry_client/cached_confluent_schema_registry.rb', line 6 def initialize(upstream) @upstream = upstream @schemas_by_id = {} @ids_by_schema = {} @versions_by_subject_and_id = {} @mutex = Mutex.new end |
Instance Method Details
#fetch(id) ⇒ String
Returns the schema string stored in the registry for the given id.
23 24 25 26 27 |
# File 'lib/schema_registry_client/cached_confluent_schema_registry.rb', line 23 def fetch(id) @mutex.synchronize do @schemas_by_id[id] ||= @upstream.fetch(id) end end |
#fetch_id(subject, schema) ⇒ Object
51 52 53 54 55 |
# File 'lib/schema_registry_client/cached_confluent_schema_registry.rb', line 51 def fetch_id(subject, schema) @mutex.synchronize do @ids_by_schema[[subject, schema]] end end |
#fetch_version(id, subject) ⇒ Integer?
Returns the version of the schema for the given subject and id, or nil if not found.
32 33 34 35 36 37 38 39 40 |
# File 'lib/schema_registry_client/cached_confluent_schema_registry.rb', line 32 def fetch_version(id, subject) @mutex.synchronize do key = [subject, id] return @versions_by_subject_and_id[key] if @versions_by_subject_and_id[key] results = @upstream.schema_subject_versions(id) @versions_by_subject_and_id[key] = results&.find { |r| r["subject"] == subject }&.dig("version") end end |
#register(subject, schema, references: [], schema_type: "PROTOBUF") ⇒ Object
61 62 63 64 65 66 67 68 69 70 |
# File 'lib/schema_registry_client/cached_confluent_schema_registry.rb', line 61 def register(subject, schema, references: [], schema_type: "PROTOBUF") @mutex.synchronize do key = [subject, schema] @ids_by_schema[key] ||= @upstream.register(subject, schema, references: references, schema_type: schema_type) end end |
#registered?(subject, schema) ⇒ Boolean
Returns true if we know the schema has been registered for that subject.
45 46 47 48 49 |
# File 'lib/schema_registry_client/cached_confluent_schema_registry.rb', line 45 def registered?(subject, schema) @mutex.synchronize do @ids_by_schema.key?([subject, schema]) end end |