Class: MysqlReplicator::Connection
- Inherits:
-
Object
- Object
- MysqlReplicator::Connection
- Defined in:
- lib/mysql_replicator/connection.rb
Instance Attribute Summary collapse
-
#database ⇒ Object
readonly
@rbs! attr_reader host: String @rbs! attr_reader port: Integer @rbs! attr_reader user: String @rbs! attr_reader password: String @rbs! attr_reader database: String.
-
#host ⇒ Object
readonly
@rbs! attr_reader host: String @rbs! attr_reader port: Integer @rbs! attr_reader user: String @rbs! attr_reader password: String @rbs! attr_reader database: String.
-
#password ⇒ Object
readonly
@rbs! attr_reader host: String @rbs! attr_reader port: Integer @rbs! attr_reader user: String @rbs! attr_reader password: String @rbs! attr_reader database: String.
-
#port ⇒ Object
readonly
@rbs! attr_reader host: String @rbs! attr_reader port: Integer @rbs! attr_reader user: String @rbs! attr_reader password: String @rbs! attr_reader database: String.
-
#user ⇒ Object
readonly
@rbs! attr_reader host: String @rbs! attr_reader port: Integer @rbs! attr_reader user: String @rbs! attr_reader password: String @rbs! attr_reader database: String.
Instance Method Summary collapse
- #close ⇒ Object
- #connect ⇒ Object
- #connected? ⇒ Boolean
- #connection_id ⇒ Object
- #dup ⇒ Object
- #flush_socket_buffer ⇒ Object
-
#initialize(host: 'localhost', port: 3306, user: 'root', password: '', database: '') ⇒ Connection
constructor
A new instance of Connection.
- #ping ⇒ Object
- #query(sql) ⇒ Object
- #read_packet ⇒ Object
- #reset_sequence_id ⇒ Object
- #send_packet(payload) ⇒ Object
Constructor Details
#initialize(host: 'localhost', port: 3306, user: 'root', password: '', database: '') ⇒ Connection
Returns a new instance of Connection.
38 39 40 41 42 43 44 45 46 47 |
# File 'lib/mysql_replicator/connection.rb', line 38 def initialize(host: 'localhost', port: 3306, user: 'root', password: '', database: '') @host = host @port = port @user = user @password = password @database = database @sequence_id = 0 @connected = false end |
Instance Attribute Details
#database ⇒ Object (readonly)
@rbs! attr_reader host: String @rbs! attr_reader port: Integer @rbs! attr_reader user: String @rbs! attr_reader password: String @rbs! attr_reader database: String
30 31 32 |
# File 'lib/mysql_replicator/connection.rb', line 30 def database @database end |
#host ⇒ Object (readonly)
@rbs! attr_reader host: String @rbs! attr_reader port: Integer @rbs! attr_reader user: String @rbs! attr_reader password: String @rbs! attr_reader database: String
30 31 32 |
# File 'lib/mysql_replicator/connection.rb', line 30 def host @host end |
#password ⇒ Object (readonly)
@rbs! attr_reader host: String @rbs! attr_reader port: Integer @rbs! attr_reader user: String @rbs! attr_reader password: String @rbs! attr_reader database: String
30 31 32 |
# File 'lib/mysql_replicator/connection.rb', line 30 def password @password end |
#port ⇒ Object (readonly)
@rbs! attr_reader host: String @rbs! attr_reader port: Integer @rbs! attr_reader user: String @rbs! attr_reader password: String @rbs! attr_reader database: String
30 31 32 |
# File 'lib/mysql_replicator/connection.rb', line 30 def port @port end |
#user ⇒ Object (readonly)
@rbs! attr_reader host: String @rbs! attr_reader port: Integer @rbs! attr_reader user: String @rbs! attr_reader password: String @rbs! attr_reader database: String
30 31 32 |
# File 'lib/mysql_replicator/connection.rb', line 30 def user @user end |
Instance Method Details
#close ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/mysql_replicator/connection.rb', line 111 def close if !@connected && (@socket.nil? || @socket.closed?) MysqlReplicator::Logger.warn 'Connection is not connected' return end reset_sequence_id if @connected quit_payload = [0x01].pack('C') send_packet(quit_payload) end @socket.close unless @socket.closed? @connected = false MysqlReplicator::Logger.info "Disconnected to MySQL server at #{@host}:#{@port}" end |
#connect ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/mysql_replicator/connection.rb', line 60 def connect if @connected MysqlReplicator::Logger.warn 'Connection is already connected' return end @socket = TCPSocket.new(@host, @port) @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) @handshake_info = MysqlReplicator::Connections::Handshake.execute(self) MysqlReplicator::Connections::Auth.execute(self, @user, @password, @database, @handshake_info) @connected = true MysqlReplicator::Logger.info "Connected to MySQL server at #{@host}:#{@port}" rescue => e close raise e end |
#connected? ⇒ Boolean
55 56 57 |
# File 'lib/mysql_replicator/connection.rb', line 55 def connected? @connected end |
#connection_id ⇒ Object
209 210 211 |
# File 'lib/mysql_replicator/connection.rb', line 209 def connection_id @handshake_info[:connection_id] end |
#dup ⇒ Object
214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/mysql_replicator/connection.rb', line 214 def dup new_connection = self.class.new( host: @host, port: @port, user: @user, password: @password, database: @database ) new_connection.connect new_connection end |
#flush_socket_buffer ⇒ Object
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/mysql_replicator/connection.rb', line 182 def flush_socket_buffer return if @socket.nil? flushed_data = '' begin # Read all unread data in non-blocking mode while @socket.ready? data = @socket.read_nonblock(1024) flushed_data += data MysqlReplicator::Logger.debug \ "Found unread data: #{MysqlReplicator::StringUtil.read_array_from_int8(data).map { |b| format('%02X', b) }.join(' ')}" end sleep 0.1 rescue IO::WaitReadable # Not at all if no data rescue => e MysqlReplicator::Logger.error "Buffer clear error: #{e.}" end return if flushed_data.empty? MysqlReplicator::Logger.debug "#{flushed_data.length} bytes of unread data cleared" end |
#ping ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/mysql_replicator/connection.rb', line 94 def ping unless @connected MysqlReplicator::Logger.warn 'Connection is not connected' return 'ERROR' end reset_sequence_id ping_payload = [0x0E].pack('C') send_packet(ping_payload) response = read_packet success = MysqlReplicator::StringUtil.read_uint8(response[:payload][0]) == 0x00 success ? 'PONG' : 'ERROR' end |
#query(sql) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/mysql_replicator/connection.rb', line 81 def query(sql) unless @connected MysqlReplicator::Logger.warn 'Connection is not connected' return end reset_sequence_id flush_socket_buffer MysqlReplicator::Connections::Query.execute(self, sql) end |
#read_packet ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/mysql_replicator/connection.rb', line 131 def read_packet if @socket.nil? raise MysqlReplicator::Error, 'TCPSocket is nil' end header = @socket.read(4) if header.nil? || header.length != 4 raise MysqlReplicator::Error, 'Failed to read packet header' end # Little-endian 24-bit packet_length = MysqlReplicator::StringUtil.read_uint8(header[0]) | (MysqlReplicator::StringUtil.read_uint8(header[1]) << 8) | (MysqlReplicator::StringUtil.read_uint8(header[2]) << 16) sequence_id = MysqlReplicator::StringUtil.read_uint8(header[3]) payload = @socket.read(packet_length) if payload.nil? || payload.length != packet_length raise MysqlReplicator::Error, "Failed to read packet payload: expected #{packet_length} bytes, got #{payload&.length || 0}" end packet = { length: packet_length, sequence_id: sequence_id, payload: payload } MysqlReplicator::Logger.debug "Received packet: #{packet.inspect}" # Update to next expected sequence ID @sequence_id = (sequence_id + 1) % 256 { length: packet_length, sequence_id: sequence_id, payload: payload } end |
#reset_sequence_id ⇒ Object
50 51 52 |
# File 'lib/mysql_replicator/connection.rb', line 50 def reset_sequence_id @sequence_id = 0 end |
#send_packet(payload) ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/mysql_replicator/connection.rb', line 168 def send_packet(payload) if @socket.nil? raise MysqlReplicator::Error, 'TCPSocket is nil' end packet_length = payload.length header = ([packet_length].pack('V')[0..2] || '') + [@sequence_id].pack('C').to_s @socket.write(header + payload) packet = { length: packet_length, sequence_id: @sequence_id, payload: payload } MysqlReplicator::Logger.debug "Sent packet: #{packet.inspect}" end |