Class: Pgoutput::Client::Stream Private

Inherits:
Object
  • Object
show all
Defined in:
lib/pgoutput/client/stream.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Logical replication stream loop.

‘Stream` consumes PostgreSQL CopyData payloads from a Connection. It understands the two replication envelope message types used by PostgreSQL’s streaming replication protocol:

  • ‘w` — XLogData, containing logical decoding plugin payload bytes.

  • ‘k` — primary keepalive, optionally requesting immediate feedback.

The stream yields only XLogData plugin payloads. Keepalive messages are handled internally by updating the latest known WAL position and sending standby feedback when requested.

Feedback separates receipt from downstream acknowledgment. Received LSN follows the latest WAL position seen from PostgreSQL. Flushed/applied LSN follows #ack, allowing downstream consumers to decide when a WAL position is safe to report as durable.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection:, configuration:, acked_lsn: nil) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Build a stream loop.

Parameters:

  • connection (Connection)

    replication connection

  • configuration (Configuration)

    stream configuration

  • acked_lsn (String, Integer, nil) (defaults to: nil)

    initial downstream-acknowledged WAL position



48
49
50
51
52
53
54
55
56
57
# File 'lib/pgoutput/client/stream.rb', line 48

def initialize(connection:, configuration:, acked_lsn: nil)
  @connection = connection
  @configuration = configuration
  @latest_lsn = LSN.parse(configuration.start_lsn_string)
  @acked_lsn = acked_lsn ? normalize_lsn_value(acked_lsn) : @latest_lsn
  @last_feedback_at = monotonic_time
  @last_keepalive_at = nil
  @running = false
  @stop_requested = false
end

Instance Attribute Details

#acked_lsnInteger (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Latest downstream-acknowledged WAL position used as flushed/applied LSN in standby feedback.

Returns:

  • (Integer)


34
35
36
# File 'lib/pgoutput/client/stream.rb', line 34

def acked_lsn
  @acked_lsn
end

#last_keepalive_atTime? (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Last time a primary keepalive was observed.

Returns:

  • (Time, nil)


39
40
41
# File 'lib/pgoutput/client/stream.rb', line 39

def last_keepalive_at
  @last_keepalive_at
end

#latest_lsnInteger (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Latest WAL position observed from XLogData or keepalive messages.

Returns:

  • (Integer)


28
29
30
# File 'lib/pgoutput/client/stream.rb', line 28

def latest_lsn
  @latest_lsn
end

Instance Method Details

#ack(lsn) ⇒ Integer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Mark a WAL position as durably handled by downstream code.

The stream never decides sink durability on its own. Downstream code may call this after checkpointing, enqueueing, or otherwise making progress durable. Feedback sent after this call reports the acknowledged LSN as both flushed and applied.

Parameters:

  • lsn (String, Integer)

    WAL position acknowledged by downstream code

Returns:

  • (Integer)

    normalized acknowledged WAL position



118
119
120
121
# File 'lib/pgoutput/client/stream.rb', line 118

def ack(lsn)
  parsed = normalize_lsn_value(lsn)
  @acked_lsn = [@acked_lsn, parsed].max
end

#running?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Whether the stream loop is active.

Returns:

  • (Boolean)


105
106
107
# File 'lib/pgoutput/client/stream.rb', line 105

def running?
  @running
end

#start {|payload, metadata| ... } ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Start the stream loop.

The method blocks while the stream is running. For every XLogData envelope, it yields the raw pgoutput payload and the parsed envelope metadata. When no CopyData payload is currently available, the loop pauses briefly before checking again.

Yields:

  • (payload, metadata)

    called for each XLogData payload

Yield Parameters:

  • payload (String)

    frozen raw pgoutput payload bytes

  • metadata (XLogData)

    parsed WAL envelope metadata

Raises:

  • (ArgumentError)

    if no block is provided

  • (ProtocolError)

    if an unknown or malformed replication message is received

  • (ConnectionError)

    if standby feedback cannot be sent



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/pgoutput/client/stream.rb', line 74

def start(&block)
  raise ArgumentError, "block required" unless block_given?

  @running = true
  while @running
    copy_data = @connection.get_copy_data
    if copy_data.nil?
      sleep 0.01
      next
    end

    process_copy_data(copy_data, &block)
    send_periodic_feedback
  end
ensure
  send_feedback(reply_requested: false) if @stop_requested
  @running = false
end

#stopvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Stop the stream loop after the current iteration.



96
97
98
99
100
# File 'lib/pgoutput/client/stream.rb', line 96

def stop
  @stop_requested = true
  @running = false
  nil
end