pgoutput-decoder
A high-level PostgreSQL pgoutput logical replication value decoder for Ruby.
pgoutput-decoder is the companion layer to pgoutput-parser. It accepts immutable protocol messages produced by pgoutput-parser and turns tuple payloads into application-friendly Ruby row-change events.
It does not parse PostgreSQL wire bytes and it does not open replication connections. Those concerns belong to lower-level parser and future client layers.
Requirements
- Ruby 3.4+
pgoutput-parser~> 0.1
Architecture
pgoutput-parser
│
▼
Protocol messages
│
▼
pgoutput-decoder
│
▼
Decoded row-change events
What This Gem Does
- Decodes PostgreSQL OID-backed tuple values
- Builds Ruby hashes from relation columns and tuple values
- Tracks relation metadata from
Relationmessages - Tracks active transaction context from
Begin/Commitmessages - Attaches
transaction_idto DML events - Returns immutable, Ractor-shareable event objects
- Supports custom OID decoders
What This Gem Does Not Do
This gem intentionally does not:
- Parse PostgreSQL
CopyDatabytes - Manage replication slots
- Open replication connections
- Maintain WAL acknowledgements
- Reconnect to PostgreSQL
- Publish events to queues
- Integrate with ActiveRecord
Installation
gem "pgoutput-decoder"
Then:
bundle install
Require it with:
require "pgoutput/decoder"
Quick Start
require "pgoutput"
require "pgoutput/decoder"
stream = Pgoutput::RelationTracker.new
decoder = Pgoutput::Decoder.new
= stream.process(payload)
event = decoder.decode()
A Relation message updates decoder metadata and returns nil:
decoder.decode()
# => nil
An insert message returns a decoded event:
event = decoder.decode()
event.transaction_id
# => 789
event.schema
# => "public"
event.table
# => "users"
event.values
# => { "id" => 7, "name" => "Alice", "active" => true }
Transaction Context
PostgreSQL pgoutput carries the transaction ID in the Begin (B) message, not on every row-change message.
The decoder remembers the active transaction and attaches it to decoded DML events:
decoder.decode()
decoder.decode()
insert = decoder.decode()
insert.transaction_id
# => 789
The transaction ID is useful for grouping changes, debugging, and CDC processing. It should not be treated as a globally permanent identifier because PostgreSQL transaction IDs can wrap around.
Supported Events
Pgoutput::Decoder::Events::Begin
Pgoutput::Decoder::Events::Commit
Pgoutput::Decoder::Events::Insert
Pgoutput::Decoder::Events::Update
Pgoutput::Decoder::Events::Delete
Default Type Support
The default registry supports common scalar PostgreSQL OIDs:
| OID | Type |
|---|---|
| 16 | boolean |
| 20 | bigint |
| 21 | smallint |
| 23 | integer |
| 25 | text |
| 114 | json |
| 700 | real |
| 701 | double precision |
| 1043 | varchar |
| 1082 | date |
| 1114 | timestamp |
| 1184 | timestamptz |
| 1700 | numeric |
| 2950 | uuid |
| 3802 | jsonb |
Unsupported OIDs are returned as frozen raw strings.
Binary Values
Binary decoding is intentionally conservative.
The decoder handles safe fixed-width binary scalar types such as:
- boolean
- int2
- int4
- int8
- float4
- float8
Unsupported binary values are preserved as frozen raw bytes.
Custom OID Decoders
registry =
Pgoutput::Decoder::TypeRegistry.default.with_decoder(999_999) do |raw, format|
format == :text ? "custom:#{raw}" : raw
end
decoder = Pgoutput::Decoder.new(type_registry: registry)
Update Events
update = decoder.decode()
update.old_key
# => { "id" => 7 } or nil
update.old_values
# => { ... } or nil
update.new_values
# => { "id" => 7, "name" => "Bob" }
Delete Events
delete = decoder.decode()
delete.old_key
# => { "id" => 7 } or nil
delete.old_values
# => { ... } or nil
Ractor Safety
Decoded events are deeply shareable:
event = decoder.decode()
Ractor.shareable?(event)
# => true
The decoder instance itself is stateful and should not be shared across Ractors.
Testing
bundle exec rake test
With coverage:
COVERAGE=true bundle exec rake test
Type Checking
bundle exec steep check
License
MIT.