Class: MysqlReplicator::Binlogs::ColumnParser

Inherits:
Object
  • Object
show all
Defined in:
lib/mysql_replicator/binlogs/column_parser.rb

Class Method Summary collapse

Class Method Details

.decimal_storage_bytes(digits) ⇒ Object



382
383
384
385
386
387
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 382

def self.decimal_storage_bytes(digits)
  # Each group of 9 digits takes 4 bytes
  full_groups = digits / 9
  leftover_digits = digits % 9
  (full_groups * 4) + [0, 1, 1, 2, 2, 3, 3, 4, 4][leftover_digits]
end

.parse(io, column_def) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 12

def self.parse(io, column_def)
  type_code = MysqlReplicator::Binlogs::FieldTypes.code_for(column_def[:data_type])

  case type_code
  when MysqlReplicator::Binlogs::FieldTypes::TINY_INT
    parse_tinyint(io)
  when MysqlReplicator::Binlogs::FieldTypes::SMALL_INT
    parse_smallint(io)
  when MysqlReplicator::Binlogs::FieldTypes::MEDIUM_INT
    parse_mediumint(io)
  when MysqlReplicator::Binlogs::FieldTypes::INT
    parse_int(io)
  when MysqlReplicator::Binlogs::FieldTypes::BIG_INT
    parse_bigint(io)
  when MysqlReplicator::Binlogs::FieldTypes::FLOAT
    parse_float(io)
  when MysqlReplicator::Binlogs::FieldTypes::DOUBLE
    parse_double(io)
  when MysqlReplicator::Binlogs::FieldTypes::DECIMAL
    parse_decimal(io, column_def)
  when MysqlReplicator::Binlogs::FieldTypes::DATETIME
    parse_datetime(io)
  when MysqlReplicator::Binlogs::FieldTypes::DATE
    parse_date(io)
  when MysqlReplicator::Binlogs::FieldTypes::TIME
    parse_time(io)
  when MysqlReplicator::Binlogs::FieldTypes::TIMESTAMP
    parse_timestamp(io)
  when MysqlReplicator::Binlogs::FieldTypes::CHAR
    parse_char(io, column_def)
  when MysqlReplicator::Binlogs::FieldTypes::VARCHAR
    parse_varchar(io, column_def)
  when MysqlReplicator::Binlogs::FieldTypes::TINY_TEXT
    parse_tinytext(io, column_def)
  when MysqlReplicator::Binlogs::FieldTypes::TEXT
    parse_text(io, column_def)
  when MysqlReplicator::Binlogs::FieldTypes::MEDIUM_TEXT
    parse_mediumtext(io, column_def)
  when MysqlReplicator::Binlogs::FieldTypes::LONG_TEXT
    parse_longtext(io, column_def)
  when MysqlReplicator::Binlogs::FieldTypes::TINY_BLOB
    parse_tinyblob(io)
  when MysqlReplicator::Binlogs::FieldTypes::BLOB
    parse_blob(io)
  when MysqlReplicator::Binlogs::FieldTypes::MEDIUM_BLOB
    parse_mediumblob(io)
  when MysqlReplicator::Binlogs::FieldTypes::LONG_BLOB
    parse_longblob(io)
  when MysqlReplicator::Binlogs::FieldTypes::BINARY
    parse_binary(io, column_def)
  when MysqlReplicator::Binlogs::FieldTypes::VAR_BINARY
    parse_varbinary(io, column_def)
  when MysqlReplicator::Binlogs::FieldTypes::JSON
    parse_json(io)
  when MysqlReplicator::Binlogs::FieldTypes::ENUM
    parse_enum(io, column_def)
  else
    raise MysqlReplicator::Error, "Unsupported type: #{type_code}"
  end
end

.parse_bigint(io) ⇒ Object



99
100
101
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 99

def self.parse_bigint(io)
  MysqlReplicator::StringIOUtil.read_int64(io)
end

.parse_binary(io, column_def) ⇒ Object



349
350
351
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 349

def self.parse_binary(io, column_def)
  parse_char(io, column_def)
end

.parse_blob(io) ⇒ Object



321
322
323
324
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 321

def self.parse_blob(io)
  length = MysqlReplicator::StringIOUtil.read_uint16(io)
  MysqlReplicator::StringIOUtil.read_str(io, length)
end

.parse_char(io, column_def) ⇒ Object



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 246

def self.parse_char(io, column_def)
  max_length = column_def[:character_maximum_length] || 10
  charset = column_def[:character_set_name]

  # Determine length prefix size
  bytes_per_char = case charset
                   when 'utf8mb4' then 4
                   when 'utf8', 'utf8mb3' then 3
                   else 1 # binary, latin1 and others
                   end
  max_bytes = max_length * bytes_per_char

  length = if max_bytes > 255
             MysqlReplicator::StringIOUtil.read_uint16(io)
           else
             MysqlReplicator::StringIOUtil.read_uint8(io)
           end

  value = MysqlReplicator::StringIOUtil.read_str(io, length)
  charset ? value.force_encoding('utf-8') : value
end

.parse_date(io) ⇒ Object

@return: String



178
179
180
181
182
183
184
185
186
187
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 178

def self.parse_date(io)
  # 3bytes: YYYY*16*32 + MM*32 + DD
  value = MysqlReplicator::StringIOUtil.read_uint24(io)

  day = value & 0x1F
  month = (value >> 5) & 0x0F
  year = value >> 9

  "#{year}-#{format('%02d', month)}-#{format('%02d', day)}"
end

.parse_datetime(io) ⇒ Object

@return: String



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 151

def self.parse_datetime(io)
  # 5bytes if fractional seconds precision is 0
  # format: 1bit sign + 17bits year*13month+month + 5bits day + 5bits hour + 6bits minute + 6bits second
  data = MysqlReplicator::StringIOUtil.read_str(io, 5).unpack('C5').map(&:to_i)
  value = (data[0] << 32) | (data[1] << 24) | (data[2] << 16) | (data[3] << 8) | data[4]

  # top level bit is sign bit
  value ^= 0x8000000000 # inversion of sign bit

  second = value & 0x3F
  value >>= 6
  minute = value & 0x3F
  value >>= 6
  hour = value & 0x1F
  value >>= 5
  day = value & 0x1F
  value >>= 5
  year_month = value & 0x1FFFF
  year = year_month / 13
  month = year_month % 13

  "#{year}-#{format('%02d', month)}-#{format('%02d', day)} " \
    "#{format('%02d', hour)}:#{format('%02d', minute)}:#{format('%02d', second)}"
end

.parse_decimal(io, column_def) ⇒ Object

This is sensitive…



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 120

def self.parse_decimal(io, column_def)
  precision = column_def[:numeric_precision] || 10
  scale = column_def[:numeric_scale] || 0

  # Decimal format is integer and fractional parts
  intg = precision - scale
  intg_bytes = decimal_storage_bytes(intg)
  frac_bytes = decimal_storage_bytes(scale)
  total_bytes = intg_bytes + frac_bytes

  data = MysqlReplicator::StringIOUtil.read_array_from_int8(io, total_bytes)

  # top level bit is sign bit (1 = positive, 0 = negative)
  negative = (data[0] & 0x80) == 0
  # inversion of sign bit
  data[0] ^= 0x80
  data = data.map { |b| b ^ 0xFF } if negative

  # parse integer part
  intg_part = parse_decimal_digits(data[0, intg_bytes] || [], intg)
  # parse fractional part
  frac_part = parse_decimal_digits(data[intg_bytes, frac_bytes] || [], scale)

  result = "#{intg_part}.#{frac_part.to_s.rjust(scale, '0')}"
  result = "-#{result}" if negative

  BigDecimal(result)
end

.parse_decimal_digits(bytes, digits) ⇒ Object



392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 392

def self.parse_decimal_digits(bytes, digits)
  return 0 if digits == 0 || bytes.nil? || bytes.empty?

  leftover_digits = digits % 9
  leftover_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4][leftover_digits]

  result = 0
  offset = 0

  # fraction part
  if leftover_digits > 0
    val = 0
    leftover_bytes.times do |i|
      val = (val << 8) | bytes[offset + i]
    end
    result = val
    offset += leftover_bytes
  end

  # 9-digit groups (4 bytes each)
  while offset < bytes.length
    val = (bytes[offset] << 24) |
          (bytes[offset + 1] << 16) |
          (bytes[offset + 2] << 8) |
          bytes[offset + 3]
    result = (result * 1_000_000_000) + val.to_i
    offset += 4
  end

  result
end

.parse_double(io) ⇒ Object



111
112
113
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 111

def self.parse_double(io)
  MysqlReplicator::StringIOUtil.read_double64(io)
end

.parse_enum(io, column_def) ⇒ Object



364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 364

def self.parse_enum(io, column_def)
  enum_values = column_def[:enum_values] || []
  index = if enum_values && enum_values.length > 255
            MysqlReplicator::StringIOUtil.read_uint16(io)
          else
            MysqlReplicator::StringIOUtil.read_uint8(io)
          end

  # Enum index starts from 1
  if index > 0 && enum_values
    enum_values[index - 1]
  else
    index
  end
end

.parse_float(io) ⇒ Object



105
106
107
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 105

def self.parse_float(io)
  MysqlReplicator::StringIOUtil.read_float32(io)
end

.parse_int(io) ⇒ Object



93
94
95
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 93

def self.parse_int(io)
  MysqlReplicator::StringIOUtil.read_int32(io)
end

.parse_json(io) ⇒ Object



355
356
357
358
359
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 355

def self.parse_json(io)
  length = MysqlReplicator::StringIOUtil.read_uint32(io)
  data = io.read(length)
  MysqlReplicator::Binlogs::JsonParser.parse(data)
end

.parse_longblob(io) ⇒ Object



335
336
337
338
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 335

def self.parse_longblob(io)
  length = MysqlReplicator::StringIOUtil.read_uint32(io)
  MysqlReplicator::StringIOUtil.read_str(io, length)
end

.parse_longtext(io, column_def) ⇒ Object



304
305
306
307
308
309
310
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 304

def self.parse_longtext(io, column_def)
  charset = column_def[:character_set_name]
  length = MysqlReplicator::StringIOUtil.read_uint32(io)

  value = MysqlReplicator::StringIOUtil.read_str(io, length)
  charset ? value.force_encoding('utf-8') : value
end

.parse_mediumblob(io) ⇒ Object



328
329
330
331
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 328

def self.parse_mediumblob(io)
  length = MysqlReplicator::StringIOUtil.read_uint24(io)
  MysqlReplicator::StringIOUtil.read_str(io, length)
end

.parse_mediumint(io) ⇒ Object



87
88
89
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 87

def self.parse_mediumint(io)
  MysqlReplicator::StringIOUtil.read_int24(io)
end

.parse_mediumtext(io, column_def) ⇒ Object



293
294
295
296
297
298
299
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 293

def self.parse_mediumtext(io, column_def)
  charset = column_def[:character_set_name]
  length = MysqlReplicator::StringIOUtil.read_uint24(io)

  value = MysqlReplicator::StringIOUtil.read_str(io, length)
  charset ? value.force_encoding('utf-8') : value
end

.parse_smallint(io) ⇒ Object



81
82
83
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 81

def self.parse_smallint(io)
  MysqlReplicator::StringIOUtil.read_int16(io)
end

.parse_text(io, column_def) ⇒ Object



282
283
284
285
286
287
288
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 282

def self.parse_text(io, column_def)
  charset = column_def[:character_set_name]
  length = MysqlReplicator::StringIOUtil.read_uint16(io)

  value = MysqlReplicator::StringIOUtil.read_str(io, length)
  charset ? value.force_encoding('utf-8') : value
end

.parse_time(io) ⇒ Object

@return: String



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 191

def self.parse_time(io)
  # 3bytes if fractional seconds precision is 0
  # format: 1bit sign + 1bit unused + 10bits hour + 6bits minute + 6bits second
  data = MysqlReplicator::StringIOUtil.read_str(io, 3).unpack('C3').map(&:to_i)
  value = (data[0] << 16) | (data[1] << 8) | data[2]

  negative = (value & 0x800000) == 0
  value &= 0x7FFFFF

  hour = (value >> 12) & 0x3FF
  minute = (value >> 6) & 0x3F
  second = value & 0x3F

  if negative
    "-#{hour}:#{minute.to_s.rjust(2, '0')}:#{second.to_s.rjust(2, '0')}"
  else
    "#{hour}:#{minute.to_s.rjust(2, '0')}:#{second.to_s.rjust(2, '0')}"
  end
end

.parse_timestamp(io) ⇒ Object

@return: Integer



213
214
215
216
217
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 213

def self.parse_timestamp(io)
  # 4bytes if fractional seconds precision is 0
  # Unix Timestamp is Big-Engian
  MysqlReplicator::StringIOUtil.read_uint32_big_endian(io)
end

.parse_tinyblob(io) ⇒ Object



314
315
316
317
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 314

def self.parse_tinyblob(io)
  length = MysqlReplicator::StringIOUtil.read_uint8(io)
  MysqlReplicator::StringIOUtil.read_str(io, length)
end

.parse_tinyint(io) ⇒ Object



75
76
77
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 75

def self.parse_tinyint(io)
  MysqlReplicator::StringIOUtil.read_int8(io)
end

.parse_tinytext(io, column_def) ⇒ Object



271
272
273
274
275
276
277
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 271

def self.parse_tinytext(io, column_def)
  charset = column_def[:character_set_name]
  length = MysqlReplicator::StringIOUtil.read_uint8(io)

  value = MysqlReplicator::StringIOUtil.read_str(io, length)
  charset ? value.force_encoding('utf-8') : value
end

.parse_varbinary(io, column_def) ⇒ Object



342
343
344
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 342

def self.parse_varbinary(io, column_def)
  parse_varchar(io, column_def)
end

.parse_varchar(io, column_def) ⇒ Object



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/mysql_replicator/binlogs/column_parser.rb', line 222

def self.parse_varchar(io, column_def)
  max_length = column_def[:character_maximum_length] || 255
  charset = column_def[:character_set_name]

  # Determine length prefix size
  bytes_per_char = case charset
                   when 'utf8mb4' then 4
                   when 'utf8', 'utf8mb3' then 3
                   else 1 # binary, latin1 and others
                   end
  max_bytes = max_length * bytes_per_char

  length = if max_bytes > 255
             MysqlReplicator::StringIOUtil.read_uint16(io)
           else
             MysqlReplicator::StringIOUtil.read_uint8(io)
           end

  value = MysqlReplicator::StringIOUtil.read_str(io, length)
  charset ? value.force_encoding('utf-8') : value
end