Class: Pgoutput::Client::Connection Private

Inherits:
Object
  • Object
show all
Defined in:
lib/pgoutput/client/connection.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Thin wrapper around ‘PG::Connection` for logical replication operations.

‘Connection` hides the small amount of PostgreSQL driver plumbing needed by the rest of the transport layer. It opens the connection in replication mode, renders replication commands through Commands, and translates `PG::Error` exceptions into ConnectionError.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configuration:, pg_connection:) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Build a connection wrapper.

This constructor is public primarily for tests and alternative connection factories. Normal callers should use open.

Parameters:

  • configuration (Configuration)

    replication configuration

  • pg_connection (PG::Connection)

    connected PostgreSQL driver object



40
41
42
43
# File 'lib/pgoutput/client/connection.rb', line 40

def initialize(configuration:, pg_connection:)
  @configuration = configuration
  @pg_connection = pg_connection
end

Instance Attribute Details

#configurationConfiguration (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Configuration associated with this connection.

Returns:



17
18
19
# File 'lib/pgoutput/client/connection.rb', line 17

def configuration
  @configuration
end

Class Method Details

.open(configuration) ⇒ Connection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Open a PostgreSQL connection in database replication mode.

Parameters:

Returns:

  • (Connection)

    wrapper around an open ‘PG::Connection`

Raises:



24
25
26
27
28
29
30
# File 'lib/pgoutput/client/connection.rb', line 24

def self.open(configuration)
  require "pg"
  connection = PG.connect(configuration.database_url, replication: "database")
  new(configuration:, pg_connection: connection)
rescue PG::Error => e
  raise ConnectionError, e.message
end

Instance Method Details

#closevoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Close the PostgreSQL connection if it is still open.



107
108
109
# File 'lib/pgoutput/client/connection.rb', line 107

def close
  @pg_connection.close unless @pg_connection.finished?
end

#create_replication_slotPG::Result

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Create the configured logical replication slot.

Returns:

  • (PG::Result)

    command result

Raises:



57
58
59
# File 'lib/pgoutput/client/connection.rb', line 57

def create_replication_slot
  exec(Commands.create_replication_slot(configuration))
end

#drop_replication_slotPG::Result

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Drop the configured logical replication slot.

Returns:

  • (PG::Result)

    command result

Raises:



65
66
67
# File 'lib/pgoutput/client/connection.rb', line 65

def drop_replication_slot
  exec(Commands.drop_replication_slot(configuration))
end

#get_copy_dataString?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Receive one CopyData payload from the server.

The call is non-blocking because the underlying ‘pg` call receives `false` for its blocking argument. `nil` means no complete CopyData payload is currently available.

Returns:

  • (String, nil)

    raw CopyData payload or ‘nil`

Raises:



85
86
87
88
89
# File 'lib/pgoutput/client/connection.rb', line 85

def get_copy_data # rubocop:disable Naming/AccessorMethodName
  @pg_connection.get_copy_data(false)
rescue PG::Error => e
  raise ConnectionError, e.message
end

#identify_systemPG::Result

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Execute PostgreSQL’s ‘IDENTIFY_SYSTEM` replication command.

Returns:

  • (PG::Result)

    server identity result

Raises:



49
50
51
# File 'lib/pgoutput/client/connection.rb', line 49

def identify_system
  exec("IDENTIFY_SYSTEM")
end

#put_copy_data(payload) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Send one CopyData payload to the server.

Used for standby status feedback messages.

Parameters:

  • payload (String)

    raw CopyData payload

Raises:



98
99
100
101
102
# File 'lib/pgoutput/client/connection.rb', line 98

def put_copy_data(payload)
  @pg_connection.put_copy_data(payload)
rescue PG::Error => e
  raise ConnectionError, e.message
end

#start_replicationPG::Result

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Start streaming logical replication from the configured slot and LSN.

Returns:

  • (PG::Result)

    command result

Raises:



73
74
75
# File 'lib/pgoutput/client/connection.rb', line 73

def start_replication
  exec(Commands.start_replication(configuration))
end