Class: SchemaRegistry::Schema::Protobuf
- Defined in:
- lib/schema_registry_client/schema/protobuf.rb
Class Method Summary collapse
Instance Method Summary collapse
- #decode(stream, schema_text) ⇒ Object
- #decode_protobuf(schema, encoded, indexes) ⇒ Object
- #dependencies(message) ⇒ Object
- #encode(message, stream, schema_name: nil) ⇒ Object
- #find_descriptor(indexes, messages) ⇒ Object
- #find_index(descriptor, messages, indexes = []) ⇒ Object
- #load_schemas! ⇒ Object
- #schema_text(message, schema_name: nil) ⇒ Object
Class Method Details
.schema_type ⇒ Object
13 14 15 |
# File 'lib/schema_registry_client/schema/protobuf.rb', line 13 def self.schema_type "PROTOBUF" end |
Instance Method Details
#decode(stream, schema_text) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/schema_registry_client/schema/protobuf.rb', line 41 def decode(stream, schema_text) # See https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format index_length = SchemaRegistry::Wire.read_int(stream) indexes = [] if index_length.zero? indexes.push(0) else index_length.times do indexes.push(SchemaRegistry::Wire.read_int(stream)) end end encoded = stream.read decode_protobuf(schema_text, encoded, indexes) end |
#decode_protobuf(schema, encoded, indexes) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/schema_registry_client/schema/protobuf.rb', line 110 def decode_protobuf(schema, encoded, indexes) # get the package package = schema.match(/package (\S+);/)[1] # get the first message in the protobuf text # TODO - get the correct message based on schema index = schema.match(/message (\w+) {/)[1] # look up the descriptor full_name = "#{package}.#{}" descriptor = Google::Protobuf::DescriptorPool.generated_pool.lookup(full_name) unless descriptor msg = "Could not find schema for #{full_name}. " \ "Make sure the corresponding .proto file has been compiled and loaded." raise msg end path = find_descriptor(indexes, descriptor.file_descriptor.to_proto.) = Google::Protobuf::DescriptorPool.generated_pool.lookup("#{package}.#{path.join(".")}") .msgclass.decode(encoded) end |
#dependencies(message) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/schema_registry_client/schema/protobuf.rb', line 68 def dependencies() return [] if .nil? load_schemas! unless @all_schemas&.any? file_descriptor = if .is_a?(Google::Protobuf::FileDescriptor) else .class.descriptor.file_descriptor end deps = file_descriptor.to_proto.dependency.to_a .reject { |d| d.start_with?("google/protobuf/") } deps.to_h do |dep| dependency_schema = @all_schemas[dep] [dependency_schema.name, dependency_schema] end end |
#encode(message, stream, schema_name: nil) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/schema_registry_client/schema/protobuf.rb', line 26 def encode(, stream, schema_name: nil) _, indexes = find_index(.class.descriptor.to_proto, .class.descriptor.file_descriptor.to_proto.) if indexes == [0] SchemaRegistry::Wire.write_int(stream, 0) else SchemaRegistry::Wire.write_int(stream, indexes.length) indexes.each { |i| SchemaRegistry::Wire.write_int(stream, i) } end # Now we write the actual message. stream.write(.to_proto) end |
#find_descriptor(indexes, messages) ⇒ Object
99 100 101 102 103 104 105 106 107 108 |
# File 'lib/schema_registry_client/schema/protobuf.rb', line 99 def find_descriptor(indexes, ) first_index = indexes.shift = [first_index] path = [.name] while indexes.length.positive? = .nested_type[indexes.shift] path.push(.name) end path end |
#find_index(descriptor, messages, indexes = []) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/schema_registry_client/schema/protobuf.rb', line 86 def find_index(descriptor, , indexes = []) .each_with_index do |sub_descriptor, i| if sub_descriptor == descriptor indexes.push(i) return [true, indexes] else found, found_indexes = find_index(descriptor, sub_descriptor.nested_type, indexes + [i]) return [true, found_indexes] if found end end [] end |
#load_schemas! ⇒ Object
57 58 59 60 61 62 63 64 65 66 |
# File 'lib/schema_registry_client/schema/protobuf.rb', line 57 def load_schemas! @all_schemas = {} all_files = ObjectSpace.each_object(Google::Protobuf::FileDescriptor).to_a all_files.each do |file_desc| file_path = file_desc.name next if file_path.start_with?("google/protobuf/") # skip built-in protos @all_schemas[file_path] = file_desc end end |
#schema_text(message, schema_name: nil) ⇒ Object
17 18 19 20 21 22 23 24 |
# File 'lib/schema_registry_client/schema/protobuf.rb', line 17 def schema_text(, schema_name: nil) file_descriptor = if .is_a?(Google::Protobuf::FileDescriptor) else .class.descriptor.file_descriptor end SchemaRegistry::Output::ProtoText.output(file_descriptor.to_proto) end |