Class: Mammoth::ReplicationConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/mammoth/replication_consumer.rb

Overview

Consumes CDC-core work items from Mammoth’s configured replication source.

ReplicationConsumer is the boundary between upstream CDC ingestion and sink delivery. Live PostgreSQL ingestion is delegated to PgoutputSource; injected sources remain available for unit tests, demos, and e2e fixtures.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, source: nil, adapter: nil) ⇒ ReplicationConsumer

Returns a new instance of ReplicationConsumer.

Parameters:

  • config (Mammoth::Configuration)

    loaded configuration

  • source (#each, nil) (defaults to: nil)

    injectable CDC work stream

  • adapter (#call, nil) (defaults to: nil)

    optional adapter for injected raw events



15
16
17
18
19
# File 'lib/mammoth/replication_consumer.rb', line 15

def initialize(config, source: nil, adapter: nil)
  @config = config
  @source = source
  @adapter = adapter
end

Instance Attribute Details

#adapterObject (readonly)

Returns the value of attribute adapter.



10
11
12
# File 'lib/mammoth/replication_consumer.rb', line 10

def adapter
  @adapter
end

#configObject (readonly)

Returns the value of attribute config.



10
11
12
# File 'lib/mammoth/replication_consumer.rb', line 10

def config
  @config
end

#sourceObject (readonly)

Returns the value of attribute source.



10
11
12
# File 'lib/mammoth/replication_consumer.rb', line 10

def source
  @source
end

Instance Method Details

#publicationString

Return the configured publication.

Returns:

  • (String)


31
32
33
# File 'lib/mammoth/replication_consumer.rb', line 31

def publication
  config.dig("replication", "publication")
end

#slotString

Return the configured replication slot.

Returns:

  • (String)


24
25
26
# File 'lib/mammoth/replication_consumer.rb', line 24

def slot
  config.dig("replication", "slot")
end

#start {|event| ... } ⇒ Integer

Consume normalized CDC work from the configured source.

Yield Parameters:

  • event (Object)

    CDC::Core::ChangeEvent-compatible event

Returns:

  • (Integer)

    number of consumed events



39
40
41
42
43
44
45
46
47
48
# File 'lib/mammoth/replication_consumer.rb', line 39

def start
  return enum_for(:start) unless block_given?

  count = 0
  each_event do |event|
    yield event
    count += 1
  end
  count
end