Class: SchemaRegistry::Schema::Protobuf

Inherits:
Base
  • Object
show all
Defined in:
lib/schema_registry_client/schema/protobuf.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.schema_typeObject



13
14
15
# File 'lib/schema_registry_client/schema/protobuf.rb', line 13

def self.schema_type
  "PROTOBUF"
end

Instance Method Details

#decode(stream, schema_text) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/schema_registry_client/schema/protobuf.rb', line 41

def decode(stream, schema_text)
  # See https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
  index_length = SchemaRegistry::Wire.read_int(stream)
  indexes = []
  if index_length.zero?
    indexes.push(0)
  else
    index_length.times do
      indexes.push(SchemaRegistry::Wire.read_int(stream))
    end
  end

  encoded = stream.read
  decode_protobuf(schema_text, encoded, indexes)
end

#decode_protobuf(schema, encoded, indexes) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/schema_registry_client/schema/protobuf.rb', line 110

def decode_protobuf(schema, encoded, indexes)
  # get the package
  package = schema.match(/package (\S+);/)[1]
  # get the first message in the protobuf text
  # TODO - get the correct message based on schema index
  message_name = schema.match(/message (\w+) {/)[1]
  # look up the descriptor
  full_name = "#{package}.#{message_name}"
  descriptor = Google::Protobuf::DescriptorPool.generated_pool.lookup(full_name)
  unless descriptor
    msg = "Could not find schema for #{full_name}. " \
          "Make sure the corresponding .proto file has been compiled and loaded."
    raise msg
  end

  path = find_descriptor(indexes, descriptor.file_descriptor.to_proto.message_type)
  correct_message = Google::Protobuf::DescriptorPool.generated_pool.lookup("#{package}.#{path.join(".")}")
  correct_message.msgclass.decode(encoded)
end

#dependencies(message) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/schema_registry_client/schema/protobuf.rb', line 68

def dependencies(message)
  return [] if message.nil?

  load_schemas! unless @all_schemas&.any?
  file_descriptor = if message.is_a?(Google::Protobuf::FileDescriptor)
    message
  else
    message.class.descriptor.file_descriptor
  end

  deps = file_descriptor.to_proto.dependency.to_a
    .reject { |d| d.start_with?("google/protobuf/") }
  deps.to_h do |dep|
    dependency_schema = @all_schemas[dep]
    [dependency_schema.name, dependency_schema]
  end
end

#encode(message, stream, schema_name: nil) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/schema_registry_client/schema/protobuf.rb', line 26

def encode(message, stream, schema_name: nil)
  _, indexes = find_index(message.class.descriptor.to_proto,
    message.class.descriptor.file_descriptor.to_proto.message_type)

  if indexes == [0]
    SchemaRegistry::Wire.write_int(stream, 0)
  else
    SchemaRegistry::Wire.write_int(stream, indexes.length)
    indexes.each { |i| SchemaRegistry::Wire.write_int(stream, i) }
  end

  # Now we write the actual message.
  stream.write(message.to_proto)
end

#find_descriptor(indexes, messages) ⇒ Object



99
100
101
102
103
104
105
106
107
108
# File 'lib/schema_registry_client/schema/protobuf.rb', line 99

def find_descriptor(indexes, messages)
  first_index = indexes.shift
  message = messages[first_index]
  path = [message.name]
  while indexes.length.positive?
    message = message.nested_type[indexes.shift]
    path.push(message.name)
  end
  path
end

#find_index(descriptor, messages, indexes = []) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/schema_registry_client/schema/protobuf.rb', line 86

def find_index(descriptor, messages, indexes = [])
  messages.each_with_index do |sub_descriptor, i|
    if sub_descriptor == descriptor
      indexes.push(i)
      return [true, indexes]
    else
      found, found_indexes = find_index(descriptor, sub_descriptor.nested_type, indexes + [i])
      return [true, found_indexes] if found
    end
  end
  []
end

#load_schemas!Object



57
58
59
60
61
62
63
64
65
66
# File 'lib/schema_registry_client/schema/protobuf.rb', line 57

def load_schemas!
  @all_schemas = {}
  all_files = ObjectSpace.each_object(Google::Protobuf::FileDescriptor).to_a
  all_files.each do |file_desc|
    file_path = file_desc.name
    next if file_path.start_with?("google/protobuf/") # skip built-in protos

    @all_schemas[file_path] = file_desc
  end
end

#schema_text(message, schema_name: nil) ⇒ Object



17
18
19
20
21
22
23
24
# File 'lib/schema_registry_client/schema/protobuf.rb', line 17

def schema_text(message, schema_name: nil)
  file_descriptor = if message.is_a?(Google::Protobuf::FileDescriptor)
    message
  else
    message.class.descriptor.file_descriptor
  end
  SchemaRegistry::Output::ProtoText.output(file_descriptor.to_proto)
end