Class: Pgoutput::Client::XLogData

Inherits:
ReplicationXLogData show all
Defined in:
lib/pgoutput/client/xlog_data.rb

Overview

Immutable XLogData replication envelope.

PostgreSQL wraps logical decoding plugin output in an XLogData CopyData payload while streaming replication is active. The payload layout is:

“‘text Byte 0 : message tag `w` Bytes 1-8 : WAL start position, unsigned 64-bit big-endian Bytes 9-16 : WAL end position, unsigned 64-bit big-endian Bytes 17-24 : server clock, PostgreSQL timestamp format Bytes 25.. : logical decoding plugin payload “`

Instances are created through XLogData.parse and made shareable so they can cross Ractor boundaries with their frozen payload bytes.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#payloadString (readonly)

frozen raw logical decoding plugin payload

Returns:

  • (String)

    the current value of payload



31
32
33
# File 'lib/pgoutput/client/xlog_data.rb', line 31

def payload
  @payload
end

#server_clockInteger (readonly)

PostgreSQL server timestamp in microseconds since 2000-01-01 UTC

Returns:

  • (Integer)

    the current value of server_clock



31
32
33
# File 'lib/pgoutput/client/xlog_data.rb', line 31

def server_clock
  @server_clock
end

#wal_endInteger (readonly)

WAL position after this message

Returns:

  • (Integer)

    the current value of wal_end



31
32
33
# File 'lib/pgoutput/client/xlog_data.rb', line 31

def wal_end
  @wal_end
end

#wal_startInteger (readonly)

WAL position where this message begins

Returns:

  • (Integer)

    the current value of wal_start



31
32
33
# File 'lib/pgoutput/client/xlog_data.rb', line 31

def wal_start
  @wal_start
end

Class Method Details

.parse(bytes) ⇒ XLogData

Parse an XLogData CopyData payload.

Parameters:

  • bytes (String)

    raw CopyData payload beginning with ‘w`

Returns:

  • (XLogData)

    immutable parsed envelope

Raises:

  • (ProtocolError)

    if the payload is empty, has the wrong message tag, or is too short to contain the required fields



38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/pgoutput/client/xlog_data.rb', line 38

def self.parse(bytes)
  binary = bytes.b
  raise ProtocolError, "empty CopyData payload" if binary.empty?
  raise ProtocolError, "expected XLogData message" unless binary.getbyte(0) == "w".ord
  raise ProtocolError, "truncated XLogData message" if binary.bytesize < 25

  wal_start = unpack_u64(binary, 1)
  wal_end = unpack_u64(binary, 9)
  server_clock = unpack_u64(binary, 17)
  payload = binary.byteslice(25..)&.freeze || "".b.freeze

  Ractor.make_shareable(new(wal_start, wal_end, server_clock, payload))
end

Instance Method Details

#wal_end_lsnString

Ending WAL position formatted as a PostgreSQL LSN string.

Returns:

  • (String)


60
# File 'lib/pgoutput/client/xlog_data.rb', line 60

def wal_end_lsn = LSN.format(wal_end)

#wal_start_lsnString

Starting WAL position formatted as a PostgreSQL LSN string.

Returns:

  • (String)


55
# File 'lib/pgoutput/client/xlog_data.rb', line 55

def wal_start_lsn = LSN.format(wal_start)