Class: Pgoutput::Client::Stream Private

Inherits:
Object
  • Object
show all
Defined in:
lib/pgoutput/client/stream.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Logical replication stream loop.

‘Stream` consumes PostgreSQL CopyData payloads from a Connection. It understands the two replication envelope message types used by PostgreSQL’s streaming replication protocol:

  • ‘w` — XLogData, containing logical decoding plugin payload bytes.

  • ‘k` — primary keepalive, optionally requesting immediate feedback.

The stream yields only XLogData plugin payloads. Keepalive messages are handled internally by updating the latest known WAL position and sending standby feedback when requested.

Instance Method Summary collapse

Constructor Details

#initialize(connection:, configuration:) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Build a stream loop.

Parameters:



25
26
27
28
29
30
31
# File 'lib/pgoutput/client/stream.rb', line 25

def initialize(connection:, configuration:)
  @connection = connection
  @configuration = configuration
  @latest_lsn = LSN.parse(configuration.start_lsn_string)
  @last_feedback_at = monotonic_time
  @running = false
end

Instance Method Details

#start {|payload, metadata| ... } ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Start the stream loop.

The method blocks while the stream is running. For every XLogData envelope, it yields the raw pgoutput payload and the parsed envelope metadata. When no CopyData payload is currently available, the loop continues and checks again.

Yields:

  • (payload, metadata)

    called for each XLogData payload

Yield Parameters:

  • payload (String)

    frozen raw pgoutput payload bytes

  • metadata (XLogData)

    parsed WAL envelope metadata

Raises:

  • (ArgumentError)

    if no block is provided

  • (ProtocolError)

    if an unknown or malformed replication message is received

  • (ConnectionError)

    if standby feedback cannot be sent



48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/pgoutput/client/stream.rb', line 48

def start(&block)
  raise ArgumentError, "block required" unless block_given?

  @running = true
  while @running
    copy_data = @connection.get_copy_data
    next unless copy_data

    process_copy_data(copy_data, &block)
    send_periodic_feedback
  end
end

#stopvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Stop the stream loop after the current iteration.



64
65
66
# File 'lib/pgoutput/client/stream.rb', line 64

def stop
  @running = false
end