Class: MysqlReplicator::Binlogs::RowsEventParser
- Inherits:
-
Object
- Object
- MysqlReplicator::Binlogs::RowsEventParser
- Defined in:
- lib/mysql_replicator/binlogs/rows_event_parser.rb
Class Method Summary collapse
- .bit_set?(bitmap, index) ⇒ Boolean
- .count_bits(bitmap, max_bits) ⇒ Object
- .parse(event_type, payload, checksum_enabled, table_map) ⇒ Object
- .parse_row(io, column_count, columns_present_bitmap, table_def) ⇒ Object
- .read_packed_integer(io) ⇒ Object
Class Method Details
.bit_set?(bitmap, index) ⇒ Boolean
147 148 149 150 151 |
# File 'lib/mysql_replicator/binlogs/rows_event_parser.rb', line 147 def self.bit_set?(bitmap, index) byte_index = index / 8 bit_index = index % 8 (bitmap[byte_index] & (1 << bit_index)) != 0 end |
.count_bits(bitmap, max_bits) ⇒ Object
156 157 158 159 160 161 162 |
# File 'lib/mysql_replicator/binlogs/rows_event_parser.rb', line 156 def self.count_bits(bitmap, max_bits) count = 0 max_bits.times do |i| count += 1 if bit_set?(bitmap, i) end count end |
.parse(event_type, payload, checksum_enabled, table_map) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/mysql_replicator/binlogs/rows_event_parser.rb', line 34 def self.parse(event_type, payload, checksum_enabled, table_map) io = StringIO.new(payload) io.set_encoding(Encoding::BINARY) # 4bytes checksum at the end if CRC32 checksum is enabled payload_size = checksum_enabled ? payload.bytesize - 4 : payload.bytesize # Table ID (6 bytes) table_id = StringIOUtil.read_uint48(io) # Flags (2 bytes) flags = StringIOUtil.read_uint16(io) # Extra data length (2 bytes) extra_data_length = StringIOUtil.read_uint16(io) # Skip extra data if present io.read(extra_data_length - 2) if extra_data_length > 2 # Column count (variable length encoded) column_count = read_packed_integer(io) # Columns present bitmap bitmap_size = (column_count + 7) / 8 # A bitmap indicating which columns are present in the row data columns_present_bitmap = MysqlReplicator::StringIOUtil.read_array_from_int8(io, bitmap_size) # if UPDATE_ROWS event, after this bitmap, there is another bitmap for "columns present in the after image" columns_after_bitmap = event_type == :UPDATE_ROWS ? MysqlReplicator::StringIOUtil.read_array_from_int8(io, bitmap_size) : [] # Parse row data table_def = table_map[table_id] rows = [] while io.pos < payload_size if event_type == :UPDATE_ROWS before_row = parse_row(io, column_count, columns_present_bitmap, table_def) after_row = parse_row(io, column_count, columns_after_bitmap, table_def) rows << { before: before_row, after: after_row } else row = parse_row(io, column_count, columns_present_bitmap, table_def) rows << row end end { database: table_def[:database], table: table_def[:table], table_id: table_id, flags: flags, extra_data_length: extra_data_length, column_count: column_count, rows: rows } end |
.parse_row(io, column_count, columns_present_bitmap, table_def) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/mysql_replicator/binlogs/rows_event_parser.rb', line 90 def self.parse_row(io, column_count, columns_present_bitmap, table_def) # Null bitmap # A bitmap indicating which columns are NULL present_count = count_bits(columns_present_bitmap, column_count) null_bitmap_size = (present_count + 7) / 8 null_bitmap = MysqlReplicator::StringIOUtil.read_array_from_int8(io, null_bitmap_size) null_bit_index = 0 row = [] table_def[:columns].each_with_index do |column_def, column_index| unless bit_set?(columns_present_bitmap, column_index) next end column_name = column_def[:column_name] # Check if the column is NULL value = if bit_set?(null_bitmap, null_bit_index) nil else MysqlReplicator::Binlogs::ColumnParser.parse(io, column_def) end null_bit_index += 1 row << { ordinal_position: column_def[:ordinal_position].to_i, data_type: column_def[:data_type], column_name: column_name, value: value, primary_key: column_def[:primary_key] } end row end |
.read_packed_integer(io) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/mysql_replicator/binlogs/rows_event_parser.rb', line 128 def self.read_packed_integer(io) first = StringIOUtil.read_uint8(io) case first when 0..250 first when 252 StringIOUtil.read_uint16(io) when 253 StringIOUtil.read_uint24(io) when 254 StringIOUtil.read_uint64(io) else raise "Invalid packed integer: #{first}" end end |