Class: Pcrd::Replication::Pgoutput::Parser

Inherits:
Object
  • Object
show all
Defined in:
lib/pcrd/replication/pgoutput/parser.rb

Overview

Decodes raw pgoutput binary messages into Messages::* structs.

The parser is stateful: it maintains a relation cache so that Insert/Update/Delete messages (which only carry a relation OID) can be enriched with column names from the most recently seen Relation message for that OID. Always feed the stream in LSN order; Relation messages always arrive before the first DML for a table.

Input: raw bytes starting at the pgoutput type tag (i.e. after stripping the 25-byte XLogData header from the replication stream wrapper).

PostgreSQL epoch reference: timestamps in pgoutput are microseconds since 2000-01-01 00:00:00 UTC (not the Unix epoch).

Constant Summary collapse

PG_EPOCH_OFFSET =

Offset in seconds from Unix epoch (1970-01-01) to PG epoch (2000-01-01).

946_684_800
HANDLERS =

pgoutput message type tags → handler method names.

{
  "B" => :decode_begin,
  "C" => :decode_commit,
  "R" => :decode_relation,
  "I" => :decode_insert,
  "U" => :decode_update,
  "D" => :decode_delete,
  "T" => :decode_type,
  "O" => :decode_origin,
  "A" => :decode_truncate,
  "M" => :decode_logical_message
}.freeze

Instance Method Summary collapse

Constructor Details

#initializeParser

Returns a new instance of Parser.



41
42
43
# File 'lib/pcrd/replication/pgoutput/parser.rb', line 41

def initialize
  @relations = {}  # OID → Messages::Relation
end

Instance Method Details

#parse(data) ⇒ Object

Parse one raw pgoutput message payload. Returns the appropriate Messages::* struct.

Raises:



47
48
49
50
51
52
53
54
55
# File 'lib/pcrd/replication/pgoutput/parser.rb', line 47

def parse(data)
  cur = Cursor.new(data)
  tag = cur.read_char

  handler = HANDLERS[tag]
  raise UnknownMessage, "Unknown pgoutput tag: #{tag.inspect} (0x#{tag.ord.to_s(16)})" unless handler

  send(handler, cur)
end

#relation(oid) ⇒ Object

Expose the relation cache for testing and for the WAL consumer.



58
59
60
# File 'lib/pcrd/replication/pgoutput/parser.rb', line 58

def relation(oid)
  @relations[oid]
end