Class: Pgoutput::Client::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/pgoutput/client.rb

Overview

High-level logical replication client facade.

‘Runner` is the simplest public entry point for consumers that want to stream raw pgoutput payloads without manually managing a replication connection. It owns the connection lifecycle for one logical replication stream:

  1. Build an immutable Configuration from keyword arguments.

  2. Open a PostgreSQL replication connection.

  3. Optionally create the configured replication slot.

  4. Start logical replication.

  5. Yield raw pgoutput payload bytes and XLogData metadata.

  6. Close the connection when streaming exits.

Examples:

Stream raw pgoutput messages

runner = Pgoutput::Client::Runner.new(
  database_url: "postgres://localhost/app",
  slot_name: "cdc_slot",
  publication_names: ["app_publication"]
)

runner.start do |payload, |
  puts "received #{payload.bytesize} bytes at #{.wal_end_lsn}"
end

See Also:

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**options) ⇒ void

Build a runner from configuration keyword arguments.

The accepted keywords are the same as Configuration#initialize. The resulting configuration object is immutable and reused for the lifetime of this runner.

Parameters:

Raises:



73
74
75
76
77
78
# File 'lib/pgoutput/client.rb', line 73

def initialize(**options)
  @configuration = Configuration.new(
    **options # : untyped
  )
  @stopped = false
end

Instance Attribute Details

#configurationConfiguration (readonly)

Configuration used by this runner.

Returns:



61
62
63
# File 'lib/pgoutput/client.rb', line 61

def configuration
  @configuration
end

Instance Method Details

#start {|payload, metadata| ... } ⇒ void

This method returns an undefined value.

Start streaming raw pgoutput payloads.

This method blocks until the stream stops or the underlying connection raises an error. The yielded payload is the raw logical decoding plugin payload contained inside PostgreSQL’s XLogData envelope; callers normally pass this payload to a pgoutput parser.

Yields:

  • (payload, metadata)

    called once for each XLogData payload

Yield Parameters:

  • payload (String)

    frozen raw pgoutput payload bytes

  • metadata (XLogData)

    WAL envelope metadata for the payload

Raises:

  • (ArgumentError)

    if no block is provided

  • (ConnectionError)

    if a PostgreSQL connection or command fails

  • (ProtocolError)

    if an invalid replication message is received



94
95
96
97
98
99
100
101
102
# File 'lib/pgoutput/client.rb', line 94

def start(&block)
  raise ArgumentError, "block required" unless block

  connection = Connection.open(configuration)
  setup_connection(connection)
  Stream.new(connection:, configuration:).start { |payload, | block.call(payload, ) }
ensure
  connection&.close
end

#stopvoid

This method returns an undefined value.

Request graceful stop.

This method records the caller’s intent to stop. The current implementation does not yet interrupt an active Stream; it exists as part of the public lifecycle API and may be wired into cooperative stream shutdown in a future release.



112
113
114
115
# File 'lib/pgoutput/client.rb', line 112

def stop
  @stopped = true
  nil
end