Class: MysqlReplicator::Binlogs::JsonParser

Inherits:
Object
  • Object
show all
Defined in:
lib/mysql_replicator/binlogs/json_parser.rb

Constant Summary collapse

JSONB_TYPE_SMALL_OBJECT =

@rbs!

type json = Hash[Symbol, json] | Array[json] | jsonValue
0x00
JSONB_TYPE_LARGE_OBJECT =

: Integer

0x01
JSONB_TYPE_SMALL_ARRAY =

: Integer

0x02
JSONB_TYPE_LARGE_ARRAY =

: Integer

0x03
JSONB_TYPE_LITERAL =

: Integer

0x04
JSONB_TYPE_INT16 =

: Integer

0x05
JSONB_TYPE_UINT16 =

: Integer

0x06
JSONB_TYPE_INT32 =

: Integer

0x07
JSONB_TYPE_UINT32 =

: Integer

0x08
JSONB_TYPE_INT64 =

: Integer

0x09
JSONB_TYPE_UINT64 =

: Integer

0x0A
JSONB_TYPE_DOUBLE =

: Integer

0x0B
JSONB_TYPE_STRING =

: Integer

0x0C
JSONB_TYPE_OPAQUE =

: Integer

0x0F
JSONB_NULL =

: Integer

0x00
JSONB_TRUE =

: Integer

0x01
JSONB_FALSE =

: Integer

0x02

Class Method Summary collapse

Class Method Details

.inlined_type?(type, small) ⇒ Boolean

Returns:

  • (Boolean)


285
286
287
288
289
290
291
292
293
294
# File 'lib/mysql_replicator/binlogs/json_parser.rb', line 285

def self.inlined_type?(type, small)
  case type
  when JSONB_TYPE_LITERAL, JSONB_TYPE_INT16, JSONB_TYPE_UINT16
    true
  when JSONB_TYPE_INT32, JSONB_TYPE_UINT32
    !small
  else
    false
  end
end

.parse(payload) ⇒ Object



36
37
38
39
40
41
42
43
# File 'lib/mysql_replicator/binlogs/json_parser.rb', line 36

def self.parse(payload)
  return nil if payload.nil? || payload.empty?

  data = payload.dup.force_encoding(Encoding::BINARY)

  type = MysqlReplicator::StringUtil.read_uint8(data[0])
  parse_value(type, data, 1)
end

.parse_array(data, pos, small:) ⇒ Object



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/mysql_replicator/binlogs/json_parser.rb', line 207

def self.parse_array(data, pos, small:)
  offset_size = small ? 2 : 4

  # base offset of this object
  base_offset = pos - 1

  # Read header
  # element count is json key count
  element_count = if small
                    MysqlReplicator::StringUtil.read_uint16(data[pos, 2])
                  else
                    MysqlReplicator::StringUtil.read_uint32(data[pos, 4])
                  end
  pos += offset_size
  _byte_size = if small
                 MysqlReplicator::StringUtil.read_uint16(data[pos, 2])
               else
                 MysqlReplicator::StringUtil.read_uint32(data[pos, 4])
               end
  pos += offset_size

  # Read value entries
  value_entries = []
  element_count.times do
    value_type = MysqlReplicator::StringUtil.read_uint8(data[pos])
    pos += 1

    if inlined_type?(value_type, small)
      # Small values ​​(LITERAL, INT16, etc.) are embedded directly
      inline_value = read_inlined_value(data, pos, value_type)
      pos += offset_size
      value_entries << { type: value_type, inline: true, value: inline_value }
    else
      # Large values ​​are stored as offsets
      value_offset = if small
                       MysqlReplicator::StringUtil.read_uint16(data[pos, 2])
                     else
                       MysqlReplicator::StringUtil.read_uint32(data[pos, 4])
                     end
      pos += offset_size
      value_entries << { type: value_type, inline: false, offset: value_offset }
    end
  end

  # Return real value array
  value_entries.map do |entry|
    if entry[:inline]
      entry[:value]
    else
      parse_value_at_offset(entry[:type], data, entry[:offset], base_offset)
    end
  end
end

.parse_literal(literal_type) ⇒ Object



263
264
265
266
267
268
269
270
271
# File 'lib/mysql_replicator/binlogs/json_parser.rb', line 263

def self.parse_literal(literal_type)
  case literal_type
  when JSONB_NULL  then nil
  when JSONB_TRUE  then true
  when JSONB_FALSE then false
  else
    raise "Unknown literal type: #{literal_type}"
  end
end

.parse_object(data, pos, small:) ⇒ Object

SMALL: under 65535 elements, and 2byte offsets LARGE: 65535 elements or more, and 4byte offsets



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/mysql_replicator/binlogs/json_parser.rb', line 125

def self.parse_object(data, pos, small:)
  offset_size = small ? 2 : 4

  # base offset of this object
  base_offset = pos - 1

  # Read header
  # element count is json key count
  element_count = if small
                    MysqlReplicator::StringUtil.read_uint16(data[pos, 2])
                  else
                    MysqlReplicator::StringUtil.read_uint32(data[pos, 4])
                  end
  pos += offset_size
  _byte_size = if small
                 MysqlReplicator::StringUtil.read_uint16(data[pos, 2])
               else
                 MysqlReplicator::StringUtil.read_uint32(data[pos, 4])
               end
  pos += offset_size

  # Read key entries
  key_entries = []
  element_count.times do
    key_offset = if small
                   MysqlReplicator::StringUtil.read_uint16(data[pos, 2])
                 else
                   MysqlReplicator::StringUtil.read_uint32(data[pos, 4])
                 end
    pos += offset_size
    key_length = MysqlReplicator::StringUtil.read_uint16(data[pos, 2])
    pos += 2
    key_entries << { offset: key_offset, length: key_length }
  end

  # Read value entries
  value_entries = []
  element_count.times do
    value_type = MysqlReplicator::StringUtil.read_uint8(data[pos])
    pos += 1

    if inlined_type?(value_type, small)
      # Small values ​​(LITERAL, INT16, etc.) are embedded directly
      inline_value = read_inlined_value(data, pos, value_type)
      pos += offset_size
      value_entries << { type: value_type, inline: true, value: inline_value }
    else
      # Large values ​​are stored as offsets
      value_offset = if small
                       MysqlReplicator::StringUtil.read_uint16(data[pos, 2])
                     else
                       MysqlReplicator::StringUtil.read_uint32(data[pos, 4])
                     end
      pos += offset_size
      value_entries << { type: value_type, inline: false, offset: value_offset }
    end
  end

  # Get real key string
  keys = key_entries.map do |entry|
    abs_pos = base_offset + 1 + entry[:offset]
    str = MysqlReplicator::StringUtil.read_str(data[abs_pos, entry[:length]])
    str.force_encoding(Encoding::UTF_8)
  end

  # Get real value
  values = value_entries.map do |entry|
    if entry[:inline]
      entry[:value]
    else
      parse_value_at_offset(entry[:type], data, entry[:offset], base_offset)
    end
  end

  # Return key-value hash
  keys.zip(values).to_h
end

.parse_string(data, pos) ⇒ Object



276
277
278
279
280
# File 'lib/mysql_replicator/binlogs/json_parser.rb', line 276

def self.parse_string(data, pos)
  length, bytes_read = read_variable_length(data, pos)
  value = MysqlReplicator::StringUtil.read_str(data[pos + bytes_read, length])
  value.force_encoding(Encoding::UTF_8)
end

.parse_value(type, data, pos) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/mysql_replicator/binlogs/json_parser.rb', line 49

def self.parse_value(type, data, pos)
  case type
  when JSONB_TYPE_SMALL_OBJECT
    parse_object(data, pos, small: true)
  when JSONB_TYPE_LARGE_OBJECT
    parse_object(data, pos, small: false)
  when JSONB_TYPE_SMALL_ARRAY
    parse_array(data, pos, small: true)
  when JSONB_TYPE_LARGE_ARRAY
    parse_array(data, pos, small: false)
  when JSONB_TYPE_LITERAL
    parse_literal(MysqlReplicator::StringUtil.read_uint8(data[pos]))
  when JSONB_TYPE_INT16
    MysqlReplicator::StringUtil.read_int16(data[pos, 2])
  when JSONB_TYPE_UINT16
    MysqlReplicator::StringUtil.read_uint16(data[pos, 2])
  when JSONB_TYPE_INT32
    MysqlReplicator::StringUtil.read_int32(data[pos, 4])
  when JSONB_TYPE_UINT32
    MysqlReplicator::StringUtil.read_uint32(data[pos, 4])
  when JSONB_TYPE_INT64
    MysqlReplicator::StringUtil.read_int64(data[pos, 8])
  when JSONB_TYPE_UINT64
    MysqlReplicator::StringUtil.read_uint64(data[pos, 8])
  when JSONB_TYPE_DOUBLE
    MysqlReplicator::StringUtil.read_double64(data[pos, 8])
  when JSONB_TYPE_STRING
    parse_string(data, pos)
  when JSONB_TYPE_OPAQUE
    code = MysqlReplicator::StringUtil.read_uint8(data[pos])
    raise "OPAQUE type is not supported (type code: 0x#{code.to_s(16)})"
  else
    raise "Unknown JSON type: 0x#{type.to_s(16)} at position #{pos}"
  end
end

.parse_value_at_offset(type, data, offset, base_offset) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/mysql_replicator/binlogs/json_parser.rb', line 90

def self.parse_value_at_offset(type, data, offset, base_offset)
  abs_pos = base_offset + 1 + offset

  case type
  when JSONB_TYPE_SMALL_OBJECT
    parse_object(data, abs_pos, small: true)
  when JSONB_TYPE_LARGE_OBJECT
    parse_object(data, abs_pos, small: false)
  when JSONB_TYPE_SMALL_ARRAY
    parse_array(data, abs_pos, small: true)
  when JSONB_TYPE_LARGE_ARRAY
    parse_array(data, abs_pos, small: false)
  when JSONB_TYPE_STRING
    parse_string(data, abs_pos)
  when JSONB_TYPE_INT64
    MysqlReplicator::StringUtil.read_int64(data[abs_pos, 8])
  when JSONB_TYPE_UINT64
    MysqlReplicator::StringUtil.read_uint64(data[abs_pos, 8])
  when JSONB_TYPE_DOUBLE
    MysqlReplicator::StringUtil.read_double64(data[abs_pos, 8])
  when JSONB_TYPE_OPAQUE
    code = MysqlReplicator::StringUtil.read_uint8(data[abs_pos])
    raise "OPAQUE type is not supported (type code: 0x#{code.to_s(16)})"
  else
    raise "Unexpected type at offset: 0x#{type.to_s(16)}"
  end
end

.read_inlined_value(data, pos, type) ⇒ Object



299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/mysql_replicator/binlogs/json_parser.rb', line 299

def self.read_inlined_value(data, pos, type)
  case type
  when JSONB_TYPE_LITERAL
    parse_literal(MysqlReplicator::StringUtil.read_uint8(data[pos]))
  when JSONB_TYPE_INT16
    MysqlReplicator::StringUtil.read_int16(data[pos, 2])
  when JSONB_TYPE_UINT16
    MysqlReplicator::StringUtil.read_uint16(data[pos, 2])
  when JSONB_TYPE_INT32
    MysqlReplicator::StringUtil.read_int32(data[pos, 4])
  when JSONB_TYPE_UINT32
    MysqlReplicator::StringUtil.read_uint32(data[pos, 4])
  end
end

.read_variable_length(data, pos) ⇒ Object



317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/mysql_replicator/binlogs/json_parser.rb', line 317

def self.read_variable_length(data, pos)
  length = 0
  shift = 0
  bytes_read = 0

  loop do
    byte = MysqlReplicator::StringUtil.read_uint8(data[pos + bytes_read])
    bytes_read += 1
    length |= (byte & 0x7F) << shift
    break if (byte & 0x80).zero?

    shift += 7
  end

  [length, bytes_read]
end