Class: Mammoth::Sources::Postgres
- Inherits:
-
Object
- Object
- Mammoth::Sources::Postgres
- Defined in:
- lib/mammoth/sources/postgres.rb
Overview
Concrete PostgreSQL CDC source for Mammoth.
Postgres realizes the CDC Ecosystem libraries for Mammoth’s product boundary. It composes pgoutput-client, pgoutput-parser, pgoutput-decoder, and pgoutput-source-adapter into a single source that yields CDC::Core-shaped work to the delivery runtime.
This class may mention pgoutput implementation details because it is the concrete PostgreSQL source adapter used by Mammoth. The rest of Mammoth should remain source-agnostic and consume only the work yielded here.
Instance Attribute Summary collapse
-
#adapter ⇒ Object?
readonly
Injected CDC source adapter.
-
#config ⇒ Mammoth::Configuration
readonly
Loaded Mammoth configuration.
-
#decoder ⇒ Object?
readonly
Injected pgoutput decoder.
-
#parser ⇒ Object?
readonly
Injected pgoutput protocol parser.
-
#runner ⇒ #start?
readonly
Injected pgoutput-client runner.
Instance Method Summary collapse
-
#each {|work| ... } ⇒ Enumerator?
Stream CDC::Core-shaped work from PostgreSQL logical replication.
-
#initialize(config, runner: nil, parser: nil, decoder: nil, adapter: nil) ⇒ Postgres
constructor
Build a PostgreSQL CDC source.
Constructor Details
#initialize(config, runner: nil, parser: nil, decoder: nil, adapter: nil) ⇒ Postgres
Build a PostgreSQL CDC source.
34 35 36 37 38 39 40 |
# File 'lib/mammoth/sources/postgres.rb', line 34 def initialize(config, runner: nil, parser: nil, decoder: nil, adapter: nil) @config = config @runner = runner @parser = parser @decoder = decoder @adapter = adapter end |
Instance Attribute Details
#adapter ⇒ Object? (readonly)
Returns injected CDC source adapter.
25 26 27 |
# File 'lib/mammoth/sources/postgres.rb', line 25 def adapter @adapter end |
#config ⇒ Mammoth::Configuration (readonly)
Returns loaded Mammoth configuration.
17 18 19 |
# File 'lib/mammoth/sources/postgres.rb', line 17 def config @config end |
#decoder ⇒ Object? (readonly)
Returns injected pgoutput decoder.
23 24 25 |
# File 'lib/mammoth/sources/postgres.rb', line 23 def decoder @decoder end |
#parser ⇒ Object? (readonly)
Returns injected pgoutput protocol parser.
21 22 23 |
# File 'lib/mammoth/sources/postgres.rb', line 21 def parser @parser end |
#runner ⇒ #start? (readonly)
Returns injected pgoutput-client runner.
19 20 21 |
# File 'lib/mammoth/sources/postgres.rb', line 19 def runner @runner end |
Instance Method Details
#each {|work| ... } ⇒ Enumerator?
Stream CDC::Core-shaped work from PostgreSQL logical replication.
Calling this method starts the injected or configured pgoutput-client runner. The runner owns the PostgreSQL replication connection and slot lifecycle; this class only composes the parser, decoder, and adapter libraries around the stream.
52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/mammoth/sources/postgres.rb', line 52 def each(&block) return enum_for(:each) unless block_given? effective_runner.start do |payload, = nil| process_payload(payload, , &block) end nil rescue StandardError => e raise e if e.is_a?(ReplicationError) raise ReplicationError, "PostgreSQL CDC source failed: #{e.}" end |