Class: Pgoutput::RelationTracker

Inherits:
Object
  • Object
show all
Defined in:
lib/pgoutput/relation_tracker.rb

Overview

Stateful relation tracker for pgoutput message sequences.

The relation tracker remembers Relation (‘R`) messages so DML tuple values can be annotated with PostgreSQL type OIDs. It does not decode or convert values. It only adds protocol metadata to tuple values while keeping returned objects deeply shareable.

pgoutput DML messages carry a relation id and tuple values, but they do not repeat column names or type OIDs. PostgreSQL sends that metadata separately in Relation (‘R`) messages. Call #process with payloads in stream order so Relation messages are cached before the Insert, Update, or Delete messages that reference them.

The relation cache is injectable. The default cache is a plain Hash, which is appropriate when one stream owner processes payloads sequentially. Callers with an explicit Ractor-oriented design can supply a compatible cache object, such as ‘Ratomic::Map`, through the `relation_cache:` keyword.

A custom relation cache must implement ‘#[]=` and `#fetch`. The tracker stores cached Relation messages by relation id and uses `#fetch` with a block so unknown relation ids still raise UnknownRelationError.

‘RelationTracker` does not reorder messages, buffer DML until metadata arrives, enforce per-record lifecycle ordering, or coordinate sink retries. Those guarantees belong to higher CDC pipeline layers. This class only preserves parser-layer stream semantics and validates tuple arity against cached Relation metadata.

Returned message objects are Ractor-safe.

Examples:

Default Hash-backed relation cache

stream = Pgoutput::RelationTracker.new
stream.process(relation_payload)
insert = stream.process(insert_payload)
insert.tuple.map(&:oid)

Ractor-safe relation cache with Ratomic::Map

require "ratomic"

relation_cache = Ratomic::Map.new
stream = Pgoutput::RelationTracker.new(relation_cache: relation_cache)
stream.process(relation_payload)
update = stream.process(update_payload)
update.new_tuple.map(&:oid)

Instance Method Summary collapse

Constructor Details

#initialize(relation_cache: {}) ⇒ void

Create a tracker with an optional relation cache.

Parameters:

  • relation_cache (Hash, #fetch, #[]=) (defaults to: {})

    cache for relation metadata, keyed by pgoutput relation id. The default Hash is suitable for one stream owner; callers may inject ‘Ratomic::Map` or another compatible cache for explicit Ractor-safe relation metadata sharing.



58
59
60
# File 'lib/pgoutput/relation_tracker.rb', line 58

def initialize(relation_cache: {})
  @relations = relation_cache
end

Instance Method Details

#decode(payload) ⇒ Pgoutput::Messages::Begin, ...

Backwards-compatible alias for callers migrating to ‘process`.

Parameters:

  • payload (String)

    one pgoutput logical replication message payload.

Returns:



100
101
102
# File 'lib/pgoutput/relation_tracker.rb', line 100

def decode(payload)
  process(payload)
end

#process(payload) ⇒ Pgoutput::Messages::Begin, ...

Process one pgoutput payload in stream order.

Parameters:

  • payload (String)

    one pgoutput logical replication message payload.

Returns:

Raises:



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/pgoutput/relation_tracker.rb', line 73

def process(payload)
  message = BinaryParser.new(payload).parse

  case message
  when Messages::Relation
    @relations[message.relation_id] = message
    message
  when Messages::Insert
    annotate_insert(message)
  when Messages::Update
    annotate_update(message)
  when Messages::Delete
    annotate_delete(message)
  else
    message
  end
end