Class: Mammoth::PgoutputSource

Inherits:
Object
  • Object
show all
Defined in:
lib/mammoth/pgoutput_source.rb

Overview

Streams PostgreSQL logical replication through the CDC Ecosystem boundary.

PgoutputSource is Mammoth’s upstream integration point. It composes the standalone pgoutput transport, parser, decoder, and source-adapter gems so the rest of Mammoth only receives CDC-core domain objects. Transport resiliency remains owned by pgoutput-client; Mammoth owns delivery.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, runner: nil, parser: nil, decoder: nil, source_adapter: nil) ⇒ PgoutputSource

Build the pgoutput integration source.

Parameters:

  • config (Mammoth::Configuration)

    loaded configuration

  • runner (Object, nil) (defaults to: nil)

    injectable pgoutput-client runner

  • parser (Object, nil) (defaults to: nil)

    injectable pgoutput parser

  • decoder (Object, nil) (defaults to: nil)

    injectable pgoutput decoder

  • source_adapter (Object, nil) (defaults to: nil)

    injectable CDC source adapter



29
30
31
32
33
34
35
# File 'lib/mammoth/pgoutput_source.rb', line 29

def initialize(config, runner: nil, parser: nil, decoder: nil, source_adapter: nil)
  @config = config
  @runner = runner
  @parser = parser
  @decoder = decoder
  @source_adapter = source_adapter
end

Instance Attribute Details

#configMammoth::Configuration (readonly)

Returns loaded Mammoth configuration.

Returns:



12
13
14
# File 'lib/mammoth/pgoutput_source.rb', line 12

def config
  @config
end

#decoderObject? (readonly)

Returns pgoutput-decoder compatible decoder.

Returns:

  • (Object, nil)

    pgoutput-decoder compatible decoder



18
19
20
# File 'lib/mammoth/pgoutput_source.rb', line 18

def decoder
  @decoder
end

#parserObject? (readonly)

Returns pgoutput-parser compatible parser.

Returns:

  • (Object, nil)

    pgoutput-parser compatible parser



16
17
18
# File 'lib/mammoth/pgoutput_source.rb', line 16

def parser
  @parser
end

#runnerObject? (readonly)

Returns pgoutput-client compatible runner.

Returns:

  • (Object, nil)

    pgoutput-client compatible runner



14
15
16
# File 'lib/mammoth/pgoutput_source.rb', line 14

def runner
  @runner
end

#source_adapterObject? (readonly)

Returns CDC source adapter.

Returns:

  • (Object, nil)

    CDC source adapter



20
21
22
# File 'lib/mammoth/pgoutput_source.rb', line 20

def source_adapter
  @source_adapter
end

Instance Method Details

#each {|work| ... } ⇒ void

This method returns an undefined value.

Stream CDC-core objects from PostgreSQL.

Yield Parameters:

  • work (Object)

    CDC::Core::ChangeEvent or TransactionEnvelope

Raises:



42
43
44
45
46
47
48
# File 'lib/mammoth/pgoutput_source.rb', line 42

def each
  return enum_for(:each) unless block_given?

  effective_runner.start do |payload, |
    normalized_items(payload, ).each { |item| yield item }
  end
end