Class: Pcrd::Replication::Pgoutput::Parser
- Inherits:
-
Object
- Object
- Pcrd::Replication::Pgoutput::Parser
- Defined in:
- lib/pcrd/replication/pgoutput/parser.rb
Overview
Decodes raw pgoutput binary messages into Messages::* structs.
The parser is stateful: it maintains a relation cache so that Insert/Update/Delete messages (which only carry a relation OID) can be enriched with column names from the most recently seen Relation message for that OID. Always feed the stream in LSN order; Relation messages always arrive before the first DML for a table.
Input: raw bytes starting at the pgoutput type tag (i.e. after stripping the 25-byte XLogData header from the replication stream wrapper).
PostgreSQL epoch reference: timestamps in pgoutput are microseconds since 2000-01-01 00:00:00 UTC (not the Unix epoch).
Constant Summary collapse
- PG_EPOCH_OFFSET =
Offset in seconds from Unix epoch (1970-01-01) to PG epoch (2000-01-01).
946_684_800- HANDLERS =
pgoutput message type tags → handler method names.
{ "B" => :decode_begin, "C" => :decode_commit, "R" => :decode_relation, "I" => :decode_insert, "U" => :decode_update, "D" => :decode_delete, "T" => :decode_type, "O" => :decode_origin, "A" => :decode_truncate, "M" => :decode_logical_message }.freeze
Instance Method Summary collapse
-
#initialize ⇒ Parser
constructor
A new instance of Parser.
-
#parse(data) ⇒ Object
Parse one raw pgoutput message payload.
-
#relation(oid) ⇒ Object
Expose the relation cache for testing and for the WAL consumer.
Constructor Details
#initialize ⇒ Parser
Returns a new instance of Parser.
41 42 43 |
# File 'lib/pcrd/replication/pgoutput/parser.rb', line 41 def initialize @relations = {} # OID → Messages::Relation end |
Instance Method Details
#parse(data) ⇒ Object
Parse one raw pgoutput message payload. Returns the appropriate Messages::* struct.
47 48 49 50 51 52 53 54 55 |
# File 'lib/pcrd/replication/pgoutput/parser.rb', line 47 def parse(data) cur = Cursor.new(data) tag = cur.read_char handler = HANDLERS[tag] raise UnknownMessage, "Unknown pgoutput tag: #{tag.inspect} (0x#{tag.ord.to_s(16)})" unless handler send(handler, cur) end |
#relation(oid) ⇒ Object
Expose the relation cache for testing and for the WAL consumer.
58 59 60 |
# File 'lib/pcrd/replication/pgoutput/parser.rb', line 58 def relation(oid) @relations[oid] end |