Class: Mammoth::ReplicationConsumer
- Inherits:
-
Object
- Object
- Mammoth::ReplicationConsumer
- Defined in:
- lib/mammoth/replication_consumer.rb
Overview
Consumes CDC-core work items from Mammoth’s configured replication source.
ReplicationConsumer is the boundary between upstream CDC ingestion and sink delivery. Live PostgreSQL ingestion is delegated to PgoutputSource; injected sources remain available for unit tests, demos, and e2e fixtures.
Instance Attribute Summary collapse
-
#adapter ⇒ Object
readonly
Returns the value of attribute adapter.
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#source ⇒ Object
readonly
Returns the value of attribute source.
Instance Method Summary collapse
-
#initialize(config, source: nil, adapter: nil) ⇒ ReplicationConsumer
constructor
A new instance of ReplicationConsumer.
-
#publication ⇒ String
Return the configured publication.
-
#slot ⇒ String
Return the configured replication slot.
-
#start {|event| ... } ⇒ Integer
Consume normalized CDC work from the configured source.
Constructor Details
#initialize(config, source: nil, adapter: nil) ⇒ ReplicationConsumer
Returns a new instance of ReplicationConsumer.
15 16 17 18 19 |
# File 'lib/mammoth/replication_consumer.rb', line 15 def initialize(config, source: nil, adapter: nil) @config = config @source = source @adapter = adapter end |
Instance Attribute Details
#adapter ⇒ Object (readonly)
Returns the value of attribute adapter.
10 11 12 |
# File 'lib/mammoth/replication_consumer.rb', line 10 def adapter @adapter end |
#config ⇒ Object (readonly)
Returns the value of attribute config.
10 11 12 |
# File 'lib/mammoth/replication_consumer.rb', line 10 def config @config end |
#source ⇒ Object (readonly)
Returns the value of attribute source.
10 11 12 |
# File 'lib/mammoth/replication_consumer.rb', line 10 def source @source end |
Instance Method Details
#publication ⇒ String
Return the configured publication.
31 32 33 |
# File 'lib/mammoth/replication_consumer.rb', line 31 def publication config.dig("replication", "publication") end |
#slot ⇒ String
Return the configured replication slot.
24 25 26 |
# File 'lib/mammoth/replication_consumer.rb', line 24 def slot config.dig("replication", "slot") end |
#start {|event| ... } ⇒ Integer
Consume normalized CDC work from the configured source.
39 40 41 42 43 44 45 46 47 48 |
# File 'lib/mammoth/replication_consumer.rb', line 39 def start return enum_for(:start) unless block_given? count = 0 each_event do |event| yield event count += 1 end count end |