pgoutput-client

Gem Version CI Coverage Status Ruby Version License: MIT

A transport-only PostgreSQL logical replication client for receiving raw pgoutput payloads in Ruby.

pgoutput-client connects to PostgreSQL using logical replication, starts a pgoutput replication stream, receives CopyData messages, handles keepalives, sends standby feedback, and yields raw pgoutput payload bytes to downstream gems such as pgoutput-parser and pgoutput-decoder.

It intentionally does not parse row-change messages or decode PostgreSQL values.


Requirements

  • Ruby 3.4+
  • PostgreSQL 10+
  • pg gem
  • PostgreSQL publication and logical replication slot

Ecosystem Position

PostgreSQL logical replication
        │
        ▼
pgoutput-client
        │
        ▼
CopyData / pgoutput payloads
        │
        ▼
pgoutput-parser
        │
        ▼
Protocol messages
        │
        ▼
pgoutput-decoder
        │
        ▼
Decoded row events

pgoutput-client is the transport layer only.


Features

  • Opens PostgreSQL logical replication connections
  • Builds replication commands
  • Supports CREATE_REPLICATION_SLOT
  • Supports DROP_REPLICATION_SLOT
  • Supports START_REPLICATION SLOT ... LOGICAL ...
  • Parses XLogData envelopes
  • Parses primary keepalive messages
  • Builds standby feedback messages
  • Provides LSN parse/format helpers
  • Yields raw pgoutput payload bytes
  • Includes RBS signatures
  • Includes Minitest coverage
  • No audit, parser, or decoder concerns

Installation

gem "pgoutput-client"

Then:

bundle install

Require:

require "pgoutput-client"

Quick Start

require "pgoutput-client"

client =
  Pgoutput::Client::Runner.new(
    database_url: ENV.fetch("DATABASE_URL"),
    slot_name: "my_slot",
    publication_names: ["my_publication"],
    auto_create_slot: true
  )

client.start do |payload, |
  puts "WAL end: #{.wal_end_lsn}"
  puts "Raw pgoutput payload bytes: #{payload.bytesize}"
end

Using With pgoutput-parser

require "pgoutput-client"
require "pgoutput"

client = Pgoutput::Client::Runner.new(
  database_url: ENV.fetch("DATABASE_URL"),
  slot_name: "my_slot",
  publication_names: ["my_publication"]
)

tracker = Pgoutput::RelationTracker.new

client.start do |payload, |
  message = tracker.process(payload)
  p [.wal_end_lsn, message]
end

Using With pgoutput-decoder

require "pgoutput-client"
require "pgoutput"
require "pgoutput/decoder"

tracker = Pgoutput::RelationTracker.new
decoder = Pgoutput::Decoder.new

client.start do |payload, |
  protocol_message = tracker.process(payload)
  event = decoder.decode(protocol_message)
  p [.wal_end_lsn, event]
end

What This Gem Does

PostgreSQL replication connection
        │
        ▼
CopyData stream
        │
        ▼
XLogData / Keepalive handling
        │
        ▼
Raw pgoutput payloads

It owns:

  • Replication connection setup
  • Replication command generation
  • CopyData reading
  • XLogData envelope parsing
  • Keepalive handling
  • Standby status feedback
  • LSN conversion

What This Gem Does Not Do

It does not:

  • Parse pgoutput row messages
  • Decode PostgreSQL OIDs
  • Build application events
  • Group transactions
  • Run processor pipelines
  • Manage Ractor worker pools
  • Store audit records

Those responsibilities belong to higher layers.


Logical Replication Setup

Example PostgreSQL setup:

ALTER SYSTEM SET wal_level = logical;

CREATE PUBLICATION my_publication FOR TABLE users, posts;

Create a slot automatically:

Pgoutput::Client::Runner.new(
  database_url: ENV.fetch("DATABASE_URL"),
  slot_name: "my_slot",
  publication_names: ["my_publication"],
  auto_create_slot: true
)

Or create the slot yourself:

SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput');

Public API

Pgoutput::Client::Runner

High-level facade.

client = Pgoutput::Client::Runner.new(...)
client.start { |payload, metadata| ... }

Pgoutput::Client::Configuration

Immutable configuration object.

Pgoutput::Client::Connection

Thin wrapper around PG::Connection for replication commands.

Pgoutput::Client::Stream

Consumes CopyData messages and yields pgoutput payloads.

Pgoutput::Client::LSN

Pgoutput::Client::LSN.parse("0/16B6C50")
Pgoutput::Client::LSN.format(23_817_296)

Pgoutput::Client::XLogData

Represents a WAL data envelope.

Pgoutput::Client::Keepalive

Represents a primary keepalive message.

Pgoutput::Client::Feedback

Builds standby status update payloads.


Ractor Position

The replication connection itself is stateful and ordered. It should normally run as a single reader.

Downstream parsing, decoding, and processing can be parallelized with Ractors:

pgoutput-client reader
        │
        ▼
Ractor-safe queue
        │
        ▼
parser / decoder / processor pools

Rake Tasks

Default

Run them all

bundle exec rake

Code Linting and Formatting

bundle exec rake rubocop

Testing

bundle exec rake test

With coverage:

COVERAGE=true bundle exec rake test

Type Checking

bundle exec rbs:validate

Documentation

bundle exec rake yard

License

MIT.