Class: Pgoutput::SourceAdapter::Cdc

Inherits:
CDC::Core::SourceAdapter
  • Object
show all
Defined in:
lib/pgoutput/source_adapter/cdc.rb

Overview

Normalizes Pgoutput::Decoder::Events into CDC::Core primitives.

This adapter is intentionally located under the Pgoutput namespace because it adapts pgoutput decoded events. The target is CDC::Core, so this class depends on cdc-core while the lower-level pgoutput-client, pgoutput-parser, and pgoutput-decoder gems remain standalone.

Examples:

Normalize a decoded insert event

adapter = Pgoutput::SourceAdapter::Cdc.new
change_event = adapter.normalize(decoded_insert)

Normalize a decoded transaction event batch

envelope = adapter.normalize_many([begin_event, insert_event, commit_event]).first

Constant Summary collapse

SOURCE_NAME =
'pgoutput'

Instance Method Summary collapse

Constructor Details

#initialize(primary_key_resolver: nil, metadata_builder: nil) ⇒ void

Parameters:

  • primary_key_resolver (#call, nil) (defaults to: nil)

    optional callable used to infer primary keys from decoded row values when pgoutput does not provide an old-key tuple. The callable receives the decoded event and value hash.

  • metadata_builder (#call, nil) (defaults to: nil)

    optional callable that can return extra metadata for each decoded event. Returned keys are stringified.



35
36
37
38
39
# File 'lib/pgoutput/source_adapter/cdc.rb', line 35

def initialize(primary_key_resolver: nil, metadata_builder: nil)
  @primary_key_resolver = primary_key_resolver || method(:default_primary_key)
  @metadata_builder = 
  super()
end

Instance Method Details

#normalize(event) ⇒ CDC::Core::ChangeEvent?

Normalize one decoded pgoutput event.

Transaction boundary events return nil because they do not represent a row-level change by themselves. Use #normalize_many when transaction envelopes are desired.

rubocop:disable Metrics/MethodLength

Parameters:

  • event (Object)

    a Pgoutput::Decoder::Events object.

Returns:

  • (CDC::Core::ChangeEvent, nil)

    normalized row change event, or nil for transaction boundary events.

Raises:



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/pgoutput/source_adapter/cdc.rb', line 53

def normalize(event)
  case event_name(event)
  when 'Insert'
    change_event(
      event,
      operation: :insert,
      old_values: nil,
      new_values: event.values,
      primary_key: primary_key_for(event, event.values)
    )
  when 'Update'
    old_values = event.old_values || event.old_key

    change_event(
      event,
      operation: :update,
      old_values: old_values,
      new_values: event.new_values,
      primary_key: primary_key_for(event, event.new_values)
    )
  when 'Delete'
    old_values = event.old_values || event.old_key

    change_event(
      event,
      operation: :delete,
      old_values: old_values,
      new_values: nil,
      primary_key: primary_key_for(event, old_values)
    )
  when 'Begin', 'Commit'
    nil
  else
    raise Error, "unsupported pgoutput decoded event: #{event.class}"
  end
end

#normalize_many(events) ⇒ Array<CDC::Core::ChangeEvent, CDC::Core::TransactionEnvelope>

Normalize a sequence of decoded pgoutput events.

If the sequence contains transaction boundaries, row changes between a Begin and Commit are grouped into CDC::Core::TransactionEnvelope. If no transaction boundaries are present, row changes are returned individually.

rubocop:disable Metrics/AbcSize rubocop:disable Metrics/PerceivedComplexity rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength

Parameters:

  • events (Enumerable<Object>)

    decoded pgoutput events.

Returns:

  • (Array<CDC::Core::ChangeEvent, CDC::Core::TransactionEnvelope>)

    normalized row changes and transaction envelopes in input order.

Raises:



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/pgoutput/source_adapter/cdc.rb', line 106

def normalize_many(events)
  results = [] # : Array[CDC::Core::ChangeEvent | CDC::Core::TransactionEnvelope]
  transaction_id = nil
  transaction_events = [] # : Array[CDC::Core::ChangeEvent]
   = {} # : Hash[String, untyped]

  events.each do |event|
    case event_name(event)
    when 'Begin'
      transaction_id = event.transaction_id
      transaction_events = []
       = (event).merge(
        'begin_final_lsn' => lsn_string(event.final_lsn),
        'begin_commit_timestamp' => event.commit_timestamp
      )
    when 'Commit'
      if transaction_id || !transaction_events.empty?
        results << transaction_envelope(
          event,
          transaction_id: transaction_id || event.transaction_id,
          events: transaction_events,
          metadata: 
        )
      end

      transaction_id = nil
      transaction_events = []
       = {}
    else
      normalized = normalize(event)
      next if normalized.nil?

      if transaction_id
        transaction_events << normalized
      else
        results << normalized
      end
    end
  end

  results.concat(transaction_events) if transaction_id && !transaction_events.empty?
  share(results.freeze)
end