Class: Mammoth::Sources::Postgres

Inherits:
Object
  • Object
show all
Defined in:
lib/mammoth/sources/postgres.rb

Overview

Concrete PostgreSQL CDC source for Mammoth.

Postgres realizes the CDC Ecosystem libraries for Mammoth’s product boundary. It composes pgoutput-client, pgoutput-parser, pgoutput-decoder, and pgoutput-source-adapter into a single source that yields CDC::Core-shaped work to the delivery runtime.

This class may mention pgoutput implementation details because it is the concrete PostgreSQL source adapter used by Mammoth. The rest of Mammoth should remain source-agnostic and consume only the work yielded here.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, runner: nil, parser: nil, decoder: nil, adapter: nil) ⇒ Postgres

Build a PostgreSQL CDC source.

Parameters:

  • config (Mammoth::Configuration)

    loaded configuration

  • runner (#start, nil) (defaults to: nil)

    injected pgoutput-client runner

  • parser (Object, nil) (defaults to: nil)

    injected pgoutput parser or relation tracker

  • decoder (Object, nil) (defaults to: nil)

    injected pgoutput decoder

  • adapter (Object, nil) (defaults to: nil)

    injected source adapter



34
35
36
37
38
39
40
# File 'lib/mammoth/sources/postgres.rb', line 34

def initialize(config, runner: nil, parser: nil, decoder: nil, adapter: nil)
  @config = config
  @runner = runner
  @parser = parser
  @decoder = decoder
  @adapter = adapter
end

Instance Attribute Details

#adapterObject? (readonly)

Returns injected CDC source adapter.

Returns:

  • (Object, nil)

    injected CDC source adapter



25
26
27
# File 'lib/mammoth/sources/postgres.rb', line 25

def adapter
  @adapter
end

#configMammoth::Configuration (readonly)

Returns loaded Mammoth configuration.

Returns:



17
18
19
# File 'lib/mammoth/sources/postgres.rb', line 17

def config
  @config
end

#decoderObject? (readonly)

Returns injected pgoutput decoder.

Returns:

  • (Object, nil)

    injected pgoutput decoder



23
24
25
# File 'lib/mammoth/sources/postgres.rb', line 23

def decoder
  @decoder
end

#parserObject? (readonly)

Returns injected pgoutput protocol parser.

Returns:

  • (Object, nil)

    injected pgoutput protocol parser



21
22
23
# File 'lib/mammoth/sources/postgres.rb', line 21

def parser
  @parser
end

#runner#start? (readonly)

Returns injected pgoutput-client runner.

Returns:

  • (#start, nil)

    injected pgoutput-client runner



19
20
21
# File 'lib/mammoth/sources/postgres.rb', line 19

def runner
  @runner
end

Instance Method Details

#each {|work| ... } ⇒ Enumerator?

Stream CDC::Core-shaped work from PostgreSQL logical replication.

Calling this method starts the injected or configured pgoutput-client runner. The runner owns the PostgreSQL replication connection and slot lifecycle; this class only composes the parser, decoder, and adapter libraries around the stream.

Yield Parameters:

  • work (Object)

    CDC::Core::ChangeEvent or TransactionEnvelope

Returns:

  • (Enumerator, nil)

Raises:



52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/mammoth/sources/postgres.rb', line 52

def each(&block)
  return enum_for(:each) unless block_given?

  effective_runner.start do |payload,  = nil|
    process_payload(payload, , &block)
  end
  nil
rescue StandardError => e
  raise e if e.is_a?(ReplicationError)

  raise ReplicationError, "PostgreSQL CDC source failed: #{e.message}"
end