Class: MysqlReplicator::BinlogClient
- Inherits:
-
Object
- Object
- MysqlReplicator::BinlogClient
- Defined in:
- lib/mysql_replicator/binlog_client.rb
Overview
Binlog handler using MySQL Replication Protocol
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
@rbs! attr_reader connection: MysqlReplicator::Connection.
Instance Method Summary collapse
- #configure_binlog_checksum ⇒ Object
- #handle_binlog_events ⇒ Object
-
#initialize(connection, server_id = 1001) ⇒ BinlogClient
constructor
A new instance of BinlogClient.
- #master_status ⇒ Object
- #on(&block) ⇒ Object
- #register_as_slave ⇒ Object
- #start_binlog_dump(binlog_file, binlog_position) ⇒ Object
- #start_replication ⇒ Object
- #stop_replication ⇒ Object
- #unregister_as_slave ⇒ Object
Constructor Details
#initialize(connection, server_id = 1001) ⇒ BinlogClient
Returns a new instance of BinlogClient.
18 19 20 21 22 |
# File 'lib/mysql_replicator/binlog_client.rb', line 18 def initialize(connection, server_id = 1001) @connection = connection @server_id = server_id @checksum_type = nil end |
Instance Attribute Details
#connection ⇒ Object (readonly)
@rbs! attr_reader connection: MysqlReplicator::Connection
13 14 15 |
# File 'lib/mysql_replicator/binlog_client.rb', line 13 def connection @connection end |
Instance Method Details
#configure_binlog_checksum ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/mysql_replicator/binlog_client.rb', line 129 def configure_binlog_checksum result = @connection.query('SHOW VARIABLES LIKE "binlog_checksum"') @checksum_type = result[:rows][0][:value] case @checksum_type when 'NONE' # No checksum MysqlReplicator::Logger.debug 'Set binlog checksum to NONE' when 'CRC32' @connection.query('SET @master_binlog_checksum = "CRC32"') MysqlReplicator::Logger.debug 'Set binlog checksum to CRC32' else raise MysqlReplicator::Error, "Unknown binlog checksum type: #{@checksum_type}" end end |
#handle_binlog_events ⇒ Object
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/mysql_replicator/binlog_client.rb', line 169 def handle_binlog_events event_parser = MysqlReplicator::Binlogs::EventParser.new loop do packet = @connection.read_packet payload = packet[:payload] first_byte = payload[0].unpack('C')[0] case first_byte when 0x00 binlog_event = event_parser.execute(payload[1..], @connection, @checksum_type == 'CRC32') case binlog_event[:event_type] when :QUERY, :WRITE_ROWS, :UPDATE_ROWS, :DELETE_ROWS @event_listener&.call(binlog_event) end MysqlReplicator::Logger.debug "Binlog event: #{binlog_event}" when 0xFF MysqlReplicator::Logger.error \ "Binlog event error: #{payload[1, 2].unpack('v')[0]} - #{payload[3..]}" break when 0xFE MysqlReplicator::Logger.error 'Received EOF packet - binlog stream ended' break else MysqlReplicator::Logger.error "Unexpected packet type: 0x#{first_byte.to_s(16)}" break end end end |
#master_status ⇒ Object
63 64 65 66 67 |
# File 'lib/mysql_replicator/binlog_client.rb', line 63 def master_status result = @connection.query('SHOW MASTER STATUS') row = result[:rows][0] { file: row[:file].to_s, position: row[:position].to_i } end |
#on(&block) ⇒ Object
26 27 28 |
# File 'lib/mysql_replicator/binlog_client.rb', line 26 def on(&block) @event_listener = block end |
#register_as_slave ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/mysql_replicator/binlog_client.rb', line 70 def register_as_slave @connection.reset_sequence_id # Payload of COM_REGISTER_SLAVE packet payload = [0x15].pack('C') # Server ID (4 bytes) payload += [@server_id].pack('V') # Hostname (empty string but properly formatted) hostname = '' payload += [hostname.length].pack('C') + hostname # Username (empty string but properly formatted) username = '' payload += [username.length].pack('C') + username # Password (empty string but properly formatted) password = '' payload += [password.length].pack('C') + password # MySQL port payload += [@connection.port].pack('V') # Replication rank = 0 (unused) payload += [0].pack('V') # Master ID = 0 (unused) payload += [0].pack('V') @connection.send_packet(payload) response = @connection.read_packet if response[:payload][0].unpack('C')[0] != 0x00 raise MysqlReplicator::Error, 'Failed to register as slave ' \ "error code = #{response[:payload][1, 2].unpack('v')[0]}, " \ "error message = #{response[:payload][9..]}" end MysqlReplicator::Logger.info 'Successfully registered as slave' end |
#start_binlog_dump(binlog_file, binlog_position) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/mysql_replicator/binlog_client.rb', line 148 def start_binlog_dump(binlog_file, binlog_position) @connection.reset_sequence_id # Payload of COM_BINLOG_DUMP packet payload = [0x12].pack('C') # Binlog position (4 bytes) payload += [Integer(binlog_position)].pack('V') # Flags (2 bytes) payload += [0].pack('v') # Slave server ID (4 bytes) payload += [@server_id].pack('V') # Binlog filename payload += binlog_file @connection.send_packet(payload) MysqlReplicator::Logger.info \ "Started binlog dump from #{binlog_file} at position #{binlog_position}" end |
#start_replication ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/mysql_replicator/binlog_client.rb', line 31 def start_replication @connection.connect unless @connection.connected? binlog_info = master_status binlog_file = binlog_info[:file] binlog_position = binlog_info[:position] configure_binlog_checksum if @checksum_type.nil? register_as_slave start_binlog_dump(binlog_file, binlog_position) begin handle_binlog_events rescue Interrupt stop_replication rescue => e MysqlReplicator::Logger.error \ "Unexpected error: #{e.},\n" \ "Backtrace: #{e.backtrace.first(5).join("\n")}" stop_replication end end |
#stop_replication ⇒ Object
56 57 58 59 60 |
# File 'lib/mysql_replicator/binlog_client.rb', line 56 def stop_replication @connection.flush_socket_buffer unregister_as_slave @connection.close end |
#unregister_as_slave ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/mysql_replicator/binlog_client.rb', line 107 def unregister_as_slave @connection.reset_sequence_id # Payload of COM_UNREGISTER_SLAVE packet payload = [0x1B].pack('C') # Connection ID (4 bytes) payload += [@connection.connection_id].pack('V') @connection.send_packet(payload) response = @connection.read_packet if response[:payload][0].unpack('C')[0] != 0x00 raise MysqlReplicator::Error, 'Failed to unregister as slave ' \ "error code = #{response[:payload][1, 2].unpack('v')[0]}, " \ "error message = #{response[:payload][9..]}" end MysqlReplicator::Logger.info 'Successfully unregistered as slave' end |