Class: Pgoutput::RelationTracker
- Inherits:
-
Object
- Object
- Pgoutput::RelationTracker
- Defined in:
- lib/pgoutput/relation_tracker.rb
Overview
Stateful relation tracker for pgoutput message sequences.
The relation tracker remembers Relation (‘R`) messages so DML tuple values can be annotated with PostgreSQL type OIDs. It does not decode or convert values. It only adds protocol metadata to tuple values while keeping returned objects deeply shareable.
pgoutput DML messages carry a relation id and tuple values, but they do not repeat column names or type OIDs. PostgreSQL sends that metadata separately in Relation (‘R`) messages. Call #process with payloads in stream order so Relation messages are cached before the Insert, Update, or Delete messages that reference them.
The relation cache is injectable. The default cache is a plain Hash, which is appropriate when one stream owner processes payloads sequentially. Callers with an explicit Ractor-oriented design can supply a compatible cache object, such as ‘Ratomic::Map`, through the `relation_cache:` keyword.
A custom relation cache must implement ‘#[]=` and `#fetch`. The tracker stores cached Relation messages by relation id and uses `#fetch` with a block so unknown relation ids still raise UnknownRelationError.
‘RelationTracker` does not reorder messages, buffer DML until metadata arrives, enforce per-record lifecycle ordering, or coordinate sink retries. Those guarantees belong to higher CDC pipeline layers. This class only preserves parser-layer stream semantics and validates tuple arity against cached Relation metadata.
Returned message objects are Ractor-safe.
Instance Method Summary collapse
-
#decode(payload) ⇒ Pgoutput::Messages::Begin, ...
Backwards-compatible alias for callers migrating to ‘process`.
-
#initialize(relation_cache: {}) ⇒ void
constructor
Create a tracker with an optional relation cache.
-
#process(payload) ⇒ Pgoutput::Messages::Begin, ...
Process one pgoutput payload in stream order.
Constructor Details
#initialize(relation_cache: {}) ⇒ void
Create a tracker with an optional relation cache.
58 59 60 |
# File 'lib/pgoutput/relation_tracker.rb', line 58 def initialize(relation_cache: {}) @relations = relation_cache end |
Instance Method Details
#decode(payload) ⇒ Pgoutput::Messages::Begin, ...
Backwards-compatible alias for callers migrating to ‘process`.
100 101 102 |
# File 'lib/pgoutput/relation_tracker.rb', line 100 def decode(payload) process(payload) end |
#process(payload) ⇒ Pgoutput::Messages::Begin, ...
Process one pgoutput payload in stream order.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/pgoutput/relation_tracker.rb', line 73 def process(payload) = BinaryParser.new(payload).parse case when Messages::Relation @relations[.relation_id] = when Messages::Insert annotate_insert() when Messages::Update annotate_update() when Messages::Delete annotate_delete() else end end |