Class: Pcrd::Connection::Replication
- Inherits:
-
Object
- Object
- Pcrd::Connection::Replication
- Defined in:
- lib/pcrd/connection/replication.rb
Overview
Manages a PostgreSQL logical replication connection.
Opened with replication: ‘database’ so the server accepts streaming replication protocol commands. Use open → start_replication, then poll with get_copy_data / respond with put_copy_data.
Constant Summary collapse
- SLOT_NAME_RE =
START_REPLICATION is replication-protocol SQL, not ordinary SQL: the slot name and LSN are interpolated as bare tokens, so they must be validated rather than quoted. Slot names follow PostgreSQL’s own rule (lowercase letters, digits, underscore; max 63). LSN is the standard hex/hex form. Both are config/checkpoint-derived, so validate them.
/\A[a-z0-9_]{1,63}\z/- LSN_RE =
%r{\A[0-9A-Fa-f]{1,8}/[0-9A-Fa-f]{1,8}\z}
Instance Method Summary collapse
- #close ⇒ Object
- #connected? ⇒ Boolean
-
#get_copy_data ⇒ Object
Returns a String (message bytes), nil (no data yet), or false (stream ended).
-
#initialize(config) ⇒ Replication
constructor
A new instance of Replication.
- #open ⇒ Object
-
#put_copy_data(data) ⇒ Object
Sends a client message (keepalive response) to the server.
-
#start_replication(slot_name:, pub_name:, start_lsn: "0/0") ⇒ Object
Sends START_REPLICATION and enters COPY streaming mode.
-
#wait_readable(timeout) ⇒ Object
Waits up to ‘timeout` seconds for data on the replication socket.
Constructor Details
#initialize(config) ⇒ Replication
Returns a new instance of Replication.
21 22 23 24 |
# File 'lib/pcrd/connection/replication.rb', line 21 def initialize(config) @config = config @conn = nil end |
Instance Method Details
#close ⇒ Object
82 83 84 85 |
# File 'lib/pcrd/connection/replication.rb', line 82 def close @conn&.finish @conn = nil end |
#connected? ⇒ Boolean
87 88 89 |
# File 'lib/pcrd/connection/replication.rb', line 87 def connected? @conn && !@conn.finished? end |
#get_copy_data ⇒ Object
Returns a String (message bytes), nil (no data yet), or false (stream ended). Call after wait_readable returns true, or after consume_input.
68 69 70 71 72 73 |
# File 'lib/pcrd/connection/replication.rb', line 68 def get_copy_data @conn.consume_input @conn.get_copy_data(true) rescue PG::Error => e raise Error, e. end |
#open ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/pcrd/connection/replication.rb', line 26 def open @conn = PG.connect( host: @config.host, port: @config.port, dbname: @config.database, user: @config.user, password: @config.password, application_name: "pcrd-replication", replication: "database" ) self rescue PG::ConnectionBad, PG::Error => e raise Error, "Replication connection failed to " \ "#{@config.host}:#{@config.port}/#{@config.database}: #{e.}" end |
#put_copy_data(data) ⇒ Object
Sends a client message (keepalive response) to the server.
76 77 78 79 80 |
# File 'lib/pcrd/connection/replication.rb', line 76 def put_copy_data(data) @conn.put_copy_data(data) rescue PG::Error => e raise Error, e. end |
#start_replication(slot_name:, pub_name:, start_lsn: "0/0") ⇒ Object
Sends START_REPLICATION and enters COPY streaming mode. Uses send_query + get_result (not exec) so the CopyBoth response is handled correctly and the connection is left in streaming copy mode.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/pcrd/connection/replication.rb', line 45 def start_replication(slot_name:, pub_name:, start_lsn: "0/0") validate_slot_name!(slot_name) validate_lsn!(start_lsn) pub_id = pub_name.gsub("'", "''") @conn.send_query( "START_REPLICATION SLOT #{slot_name} LOGICAL #{start_lsn} " \ "(proto_version '1', publication_names '#{pub_id}')" ) @conn.get_result # reads CopyBothResponse; puts connection in copy mode self rescue PG::Error => e raise Error, "START_REPLICATION failed: #{e.}" end |
#wait_readable(timeout) ⇒ Object
Waits up to ‘timeout` seconds for data on the replication socket. Returns true if data is available, false if the timeout expired.
62 63 64 |
# File 'lib/pcrd/connection/replication.rb', line 62 def wait_readable(timeout) @conn.socket_io.wait_readable(timeout) end |