Class: Pgoutput::SourceAdapter::Cdc
- Inherits:
-
CDC::Core::SourceAdapter
- Object
- CDC::Core::SourceAdapter
- Pgoutput::SourceAdapter::Cdc
- Defined in:
- lib/pgoutput/source_adapter/cdc.rb
Overview
Normalizes Pgoutput::Decoder::Events into CDC::Core primitives.
This adapter is intentionally located under the Pgoutput namespace because it adapts pgoutput decoded events. The target is CDC::Core, so this class depends on cdc-core while the lower-level pgoutput-client, pgoutput-parser, and pgoutput-decoder gems remain standalone.
Constant Summary collapse
- SOURCE_NAME =
'pgoutput'
Instance Method Summary collapse
- #initialize(primary_key_resolver: nil, metadata_builder: nil) ⇒ void constructor
-
#normalize(event) ⇒ CDC::Core::ChangeEvent?
Normalize one decoded pgoutput event.
-
#normalize_many(events) ⇒ Array<CDC::Core::ChangeEvent, CDC::Core::TransactionEnvelope>
Normalize a sequence of decoded pgoutput events.
Constructor Details
#initialize(primary_key_resolver: nil, metadata_builder: nil) ⇒ void
35 36 37 38 39 |
# File 'lib/pgoutput/source_adapter/cdc.rb', line 35 def initialize(primary_key_resolver: nil, metadata_builder: nil) @primary_key_resolver = primary_key_resolver || method(:default_primary_key) @metadata_builder = super() end |
Instance Method Details
#normalize(event) ⇒ CDC::Core::ChangeEvent?
Normalize one decoded pgoutput event.
Transaction boundary events return nil because they do not represent a row-level change by themselves. Use #normalize_many when transaction envelopes are desired.
rubocop:disable Metrics/MethodLength
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/pgoutput/source_adapter/cdc.rb', line 53 def normalize(event) case event_name(event) when 'Insert' change_event( event, operation: :insert, old_values: nil, new_values: event.values, primary_key: primary_key_for(event, event.values) ) when 'Update' old_values = event.old_values || event.old_key change_event( event, operation: :update, old_values: old_values, new_values: event.new_values, primary_key: primary_key_for(event, event.new_values) ) when 'Delete' old_values = event.old_values || event.old_key change_event( event, operation: :delete, old_values: old_values, new_values: nil, primary_key: primary_key_for(event, old_values) ) when 'Begin', 'Commit' nil else raise Error, "unsupported pgoutput decoded event: #{event.class}" end end |
#normalize_many(events) ⇒ Array<CDC::Core::ChangeEvent, CDC::Core::TransactionEnvelope>
Normalize a sequence of decoded pgoutput events.
If the sequence contains transaction boundaries, row changes between a Begin and Commit are grouped into CDC::Core::TransactionEnvelope. If no transaction boundaries are present, row changes are returned individually.
rubocop:disable Metrics/AbcSize rubocop:disable Metrics/PerceivedComplexity rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/pgoutput/source_adapter/cdc.rb', line 106 def normalize_many(events) results = [] # : Array[CDC::Core::ChangeEvent | CDC::Core::TransactionEnvelope] transaction_id = nil transaction_events = [] # : Array[CDC::Core::ChangeEvent] = {} # : Hash[String, untyped] events.each do |event| case event_name(event) when 'Begin' transaction_id = event.transaction_id transaction_events = [] = (event).merge( 'begin_final_lsn' => lsn_string(event.final_lsn), 'begin_commit_timestamp' => event. ) when 'Commit' if transaction_id || !transaction_events.empty? results << transaction_envelope( event, transaction_id: transaction_id || event.transaction_id, events: transaction_events, metadata: ) end transaction_id = nil transaction_events = [] = {} else normalized = normalize(event) next if normalized.nil? if transaction_id transaction_events << normalized else results << normalized end end end results.concat(transaction_events) if transaction_id && !transaction_events.empty? share(results.freeze) end |