Class: MysqlReplicator::BinlogClient
- Inherits:
-
Object
- Object
- MysqlReplicator::BinlogClient
- Defined in:
- lib/mysql_replicator/binlog_client.rb
Overview
Binlog handler using MySQL Replication Protocol
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
@rbs! attr_reader connection: MysqlReplicator::Connection.
Instance Method Summary collapse
- #configure_binlog_checksum ⇒ Object
- #handle_binlog_events ⇒ Object
-
#initialize(connection, server_id = 1001) ⇒ BinlogClient
constructor
A new instance of BinlogClient.
- #master_status ⇒ Object
- #on(&block) ⇒ Object
- #register_as_slave ⇒ Object
- #start_binlog_dump(binlog_file, binlog_position) ⇒ Object
- #start_replication ⇒ Object
- #stop_replication ⇒ Object
- #unregister_as_slave ⇒ Object
Constructor Details
#initialize(connection, server_id = 1001) ⇒ BinlogClient
Returns a new instance of BinlogClient.
18 19 20 21 22 |
# File 'lib/mysql_replicator/binlog_client.rb', line 18 def initialize(connection, server_id = 1001) @connection = connection @server_id = server_id @checksum_type = nil end |
Instance Attribute Details
#connection ⇒ Object (readonly)
@rbs! attr_reader connection: MysqlReplicator::Connection
13 14 15 |
# File 'lib/mysql_replicator/binlog_client.rb', line 13 def connection @connection end |
Instance Method Details
#configure_binlog_checksum ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/mysql_replicator/binlog_client.rb', line 136 def configure_binlog_checksum result = @connection.query('SHOW VARIABLES LIKE "binlog_checksum"') @checksum_type = result[:rows][0][:value] case @checksum_type when 'NONE' # No checksum MysqlReplicator::Logger.debug 'Set binlog checksum to NONE' when 'CRC32' @connection.query('SET @master_binlog_checksum = "CRC32"') MysqlReplicator::Logger.debug 'Set binlog checksum to CRC32' else raise MysqlReplicator::Error, "Unknown binlog checksum type: #{@checksum_type}" end end |
#handle_binlog_events ⇒ Object
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/mysql_replicator/binlog_client.rb', line 176 def handle_binlog_events event_parser = MysqlReplicator::Binlogs::EventParser.new loop do packet = @connection.read_packet payload = packet[:payload] first_byte = payload[0].unpack('C')[0] case first_byte when 0x00 binlog_event = event_parser.execute(payload[1..], @connection, @checksum_type == 'CRC32') case binlog_event[:event_type] when :QUERY, :TABLE_MAP, :WRITE_ROWS, :UPDATE_ROWS, :DELETE_ROWS @event_listener&.call(binlog_event, nil) end MysqlReplicator::Logger.debug "Binlog event: #{binlog_event}" when 0xFF MysqlReplicator::Logger.error \ "Binlog event error: #{payload[1, 2].unpack('v')[0]} - #{payload[3..]}" break when 0xFE MysqlReplicator::Logger.error 'Received EOF packet - binlog stream ended' break else MysqlReplicator::Logger.error "Unexpected packet type: 0x#{first_byte.to_s(16)}" break end end end |
#master_status ⇒ Object
70 71 72 73 74 |
# File 'lib/mysql_replicator/binlog_client.rb', line 70 def master_status result = @connection.query('SHOW MASTER STATUS') row = result[:rows][0] { file: row[:file].to_s, position: row[:position].to_i } end |
#on(&block) ⇒ Object
26 27 28 |
# File 'lib/mysql_replicator/binlog_client.rb', line 26 def on(&block) @event_listener = block end |
#register_as_slave ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/mysql_replicator/binlog_client.rb', line 77 def register_as_slave @connection.reset_sequence_id # Payload of COM_REGISTER_SLAVE packet payload = [0x15].pack('C') # Server ID (4 bytes) payload += [@server_id].pack('V') # Hostname (empty string but properly formatted) hostname = '' payload += [hostname.length].pack('C') + hostname # Username (empty string but properly formatted) username = '' payload += [username.length].pack('C') + username # Password (empty string but properly formatted) password = '' payload += [password.length].pack('C') + password # MySQL port payload += [@connection.port].pack('V') # Replication rank = 0 (unused) payload += [0].pack('V') # Master ID = 0 (unused) payload += [0].pack('V') @connection.send_packet(payload) response = @connection.read_packet if response[:payload][0].unpack('C')[0] != 0x00 raise MysqlReplicator::Error, 'Failed to register as slave ' \ "error code = #{response[:payload][1, 2].unpack('v')[0]}, " \ "error message = #{response[:payload][9..]}" end MysqlReplicator::Logger.info 'Successfully registered as slave' end |
#start_binlog_dump(binlog_file, binlog_position) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/mysql_replicator/binlog_client.rb', line 155 def start_binlog_dump(binlog_file, binlog_position) @connection.reset_sequence_id # Payload of COM_BINLOG_DUMP packet payload = [0x12].pack('C') # Binlog position (4 bytes) payload += [Integer(binlog_position)].pack('V') # Flags (2 bytes) payload += [0].pack('v') # Slave server ID (4 bytes) payload += [@server_id].pack('V') # Binlog filename payload += binlog_file @connection.send_packet(payload) MysqlReplicator::Logger.info \ "Started binlog dump from #{binlog_file} at position #{binlog_position}" end |
#start_replication ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/mysql_replicator/binlog_client.rb', line 31 def start_replication @connection.connect unless @connection.connected? binlog_info = master_status binlog_file = binlog_info[:file] binlog_position = binlog_info[:position] configure_binlog_checksum register_as_slave start_binlog_dump(binlog_file, binlog_position) begin handle_binlog_events rescue Interrupt # Ctrl+Cによる正常終了 rescue => e raise e if defined?(::IRB::Abort) && e.is_a?(::IRB::Abort) # steep:ignore UnknownConstant MysqlReplicator::Logger.error \ "Unexpected error: #{e.},\n" \ "Backtrace: #{e.backtrace.first(5).join("\n")}" @event_listener&.call(nil, e) ensure stop_replication end end |
#stop_replication ⇒ Object
60 61 62 63 64 65 66 67 |
# File 'lib/mysql_replicator/binlog_client.rb', line 60 def stop_replication @connection.flush_socket_buffer unregister_as_slave rescue => e MysqlReplicator::Logger.error "Failed to unregister as slave: #{e.}" ensure @connection.close end |
#unregister_as_slave ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/mysql_replicator/binlog_client.rb', line 114 def unregister_as_slave @connection.reset_sequence_id # Payload of COM_UNREGISTER_SLAVE packet payload = [0x1B].pack('C') # Connection ID (4 bytes) payload += [@connection.connection_id].pack('V') @connection.send_packet(payload) response = @connection.read_packet if response[:payload][0].unpack('C')[0] != 0x00 raise MysqlReplicator::Error, 'Failed to unregister as slave ' \ "error code = #{response[:payload][1, 2].unpack('v')[0]}, " \ "error message = #{response[:payload][9..]}" end MysqlReplicator::Logger.info 'Successfully unregistered as slave' end |