Class: Pcrd::Connection::Replication

Inherits:
Object
  • Object
show all
Defined in:
lib/pcrd/connection/replication.rb

Overview

Manages a PostgreSQL logical replication connection.

Opened with replication: ‘database’ so the server accepts streaming replication protocol commands. Use open → start_replication, then poll with get_copy_data / respond with put_copy_data.

Constant Summary collapse

SLOT_NAME_RE =

START_REPLICATION is replication-protocol SQL, not ordinary SQL: the slot name and LSN are interpolated as bare tokens, so they must be validated rather than quoted. Slot names follow PostgreSQL’s own rule (lowercase letters, digits, underscore; max 63). LSN is the standard hex/hex form. Both are config/checkpoint-derived, so validate them.

/\A[a-z0-9_]{1,63}\z/
LSN_RE =
%r{\A[0-9A-Fa-f]{1,8}/[0-9A-Fa-f]{1,8}\z}

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Replication

Returns a new instance of Replication.



21
22
23
24
# File 'lib/pcrd/connection/replication.rb', line 21

def initialize(config)
  @config = config
  @conn   = nil
end

Instance Method Details

#closeObject



82
83
84
85
# File 'lib/pcrd/connection/replication.rb', line 82

def close
  @conn&.finish
  @conn = nil
end

#connected?Boolean

Returns:

  • (Boolean)


87
88
89
# File 'lib/pcrd/connection/replication.rb', line 87

def connected?
  @conn && !@conn.finished?
end

#get_copy_dataObject

Returns a String (message bytes), nil (no data yet), or false (stream ended). Call after wait_readable returns true, or after consume_input.



68
69
70
71
72
73
# File 'lib/pcrd/connection/replication.rb', line 68

def get_copy_data
  @conn.consume_input
  @conn.get_copy_data(true)
rescue PG::Error => e
  raise Error, e.message
end

#openObject



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/pcrd/connection/replication.rb', line 26

def open
  @conn = PG.connect(
    host:             @config.host,
    port:             @config.port,
    dbname:           @config.database,
    user:             @config.user,
    password:         @config.password,
    application_name: "pcrd-replication",
    replication:      "database"
  )
  self
rescue PG::ConnectionBad, PG::Error => e
  raise Error, "Replication connection failed to " \
               "#{@config.host}:#{@config.port}/#{@config.database}: #{e.message}"
end

#put_copy_data(data) ⇒ Object

Sends a client message (keepalive response) to the server.



76
77
78
79
80
# File 'lib/pcrd/connection/replication.rb', line 76

def put_copy_data(data)
  @conn.put_copy_data(data)
rescue PG::Error => e
  raise Error, e.message
end

#start_replication(slot_name:, pub_name:, start_lsn: "0/0") ⇒ Object

Sends START_REPLICATION and enters COPY streaming mode. Uses send_query + get_result (not exec) so the CopyBoth response is handled correctly and the connection is left in streaming copy mode.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/pcrd/connection/replication.rb', line 45

def start_replication(slot_name:, pub_name:, start_lsn: "0/0")
  validate_slot_name!(slot_name)
  validate_lsn!(start_lsn)
  pub_id = pub_name.gsub("'", "''")

  @conn.send_query(
    "START_REPLICATION SLOT #{slot_name} LOGICAL #{start_lsn} " \
    "(proto_version '1', publication_names '#{pub_id}')"
  )
  @conn.get_result   # reads CopyBothResponse; puts connection in copy mode
  self
rescue PG::Error => e
  raise Error, "START_REPLICATION failed: #{e.message}"
end

#wait_readable(timeout) ⇒ Object

Waits up to ‘timeout` seconds for data on the replication socket. Returns true if data is available, false if the timeout expired.



62
63
64
# File 'lib/pcrd/connection/replication.rb', line 62

def wait_readable(timeout)
  @conn.socket_io.wait_readable(timeout)
end