Class: Pgoutput::Client::Connection Private

Inherits:
Object
  • Object
show all
Defined in:
lib/pgoutput/client/connection.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Thin wrapper around ‘PG::Connection` for logical replication operations.

‘Connection` hides the small amount of PostgreSQL driver plumbing needed by the rest of the transport layer. It opens the connection in replication mode, renders replication commands through Commands, and translates `PG::Error` exceptions into ConnectionError.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configuration:, pg_connection:) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Build a connection wrapper.

This constructor is public primarily for tests and alternative connection factories. Normal callers should use open.

Parameters:

  • configuration (Configuration)

    replication configuration

  • pg_connection (PG::Connection)

    connected PostgreSQL driver object



40
41
42
43
# File 'lib/pgoutput/client/connection.rb', line 40

def initialize(configuration:, pg_connection:)
  @configuration = configuration
  @pg_connection = pg_connection
end

Instance Attribute Details

#configurationConfiguration (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Configuration associated with this connection.

Returns:



17
18
19
# File 'lib/pgoutput/client/connection.rb', line 17

def configuration
  @configuration
end

Class Method Details

.open(configuration) ⇒ Connection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Open a PostgreSQL connection in database replication mode.

Parameters:

Returns:

  • (Connection)

    wrapper around an open ‘PG::Connection`

Raises:



24
25
26
27
28
29
30
# File 'lib/pgoutput/client/connection.rb', line 24

def self.open(configuration)
  require "pg"
  connection = PG.connect(configuration.database_url, replication: "database")
  new(configuration:, pg_connection: connection)
rescue PG::Error => e
  raise ConnectionError, e.message
end

Instance Method Details

#closevoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Close the PostgreSQL connection if it is still open.



112
113
114
# File 'lib/pgoutput/client/connection.rb', line 112

def close
  @pg_connection.close unless @pg_connection.finished?
end

#create_replication_slotPG::Result

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Create the configured logical replication slot.

Returns:

  • (PG::Result)

    command result

Raises:



57
58
59
# File 'lib/pgoutput/client/connection.rb', line 57

def create_replication_slot
  exec(Commands.create_replication_slot(configuration))
end

#drop_replication_slotPG::Result

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Drop the configured logical replication slot.

Returns:

  • (PG::Result)

    command result

Raises:



65
66
67
# File 'lib/pgoutput/client/connection.rb', line 65

def drop_replication_slot
  exec(Commands.drop_replication_slot(configuration))
end

#get_copy_dataString?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Receive one CopyData payload from the server.

The stream must not block forever while PostgreSQL is idle, because the caller needs opportunities to send periodic standby feedback. Wait briefly for socket readability, then use the pg driver’s blocking CopyData read only when data is available. ‘nil` means the stream is currently idle.

Returns:

  • (String, nil)

    raw CopyData payload or ‘nil`

Raises:



87
88
89
90
91
92
93
94
# File 'lib/pgoutput/client/connection.rb', line 87

def get_copy_data # rubocop:disable Naming/AccessorMethodName
  return nil unless copy_data_readable?

  copy_data = @pg_connection.get_copy_data(false)
  copy_data == false ? nil : copy_data
rescue PG::Error => e
  raise ConnectionError, e.message
end

#identify_systemPG::Result

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Execute PostgreSQL’s ‘IDENTIFY_SYSTEM` replication command.

Returns:

  • (PG::Result)

    server identity result

Raises:



49
50
51
# File 'lib/pgoutput/client/connection.rb', line 49

def identify_system
  exec("IDENTIFY_SYSTEM")
end

#put_copy_data(payload) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Send one CopyData payload to the server.

Used for standby status feedback messages.

Parameters:

  • payload (String)

    raw CopyData payload

Raises:



103
104
105
106
107
# File 'lib/pgoutput/client/connection.rb', line 103

def put_copy_data(payload)
  @pg_connection.put_copy_data(payload)
rescue PG::Error => e
  raise ConnectionError, e.message
end

#start_replicationPG::Result

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Start streaming logical replication from the configured slot and LSN.

Returns:

  • (PG::Result)

    command result

Raises:



73
74
75
# File 'lib/pgoutput/client/connection.rb', line 73

def start_replication
  exec(Commands.start_replication(configuration))
end