Class: CDC::Core::ChangeEvent
- Inherits:
-
Object
- Object
- CDC::Core::ChangeEvent
- Defined in:
- lib/cdc/core/change_event.rb
Overview
Immutable representation of one logical database change.
ChangeEvent is the core data structure passed through filters, pipelines, and processors. It is database-agnostic but carries common CDC fields such as operation, schema, table, before/after values, primary key, LSN, and metadata.
Instance Attribute Summary collapse
- #commit_lsn ⇒ Symbol, ... readonly
- #metadata ⇒ Symbol, ... readonly
- #new_values ⇒ Symbol, ... readonly
- #occurred_at ⇒ Symbol, ... readonly
- #old_values ⇒ Symbol, ... readonly
- #operation ⇒ Symbol, ... readonly
- #primary_key ⇒ Symbol, ... readonly
- #schema ⇒ Symbol, ... readonly
- #sequence_number ⇒ Symbol, ... readonly
- #table ⇒ Symbol, ... readonly
- #transaction_id ⇒ Symbol, ... readonly
Instance Method Summary collapse
-
#changes ⇒ Array<ColumnChange>
Compute changed columns by comparing old and new values.
-
#delete? ⇒ Boolean
True for delete events.
-
#initialize(operation:, schema:, table:, old_values: nil, new_values: nil, primary_key: nil, transaction_id: nil, commit_lsn: nil, sequence_number: nil, occurred_at: nil, metadata: {}) ⇒ ChangeEvent
constructor
Build a change event.
-
#insert? ⇒ Boolean
True for insert events.
-
#qualified_table_name ⇒ String
Fully qualified table name in schema.table form.
-
#to_h ⇒ Hash{String=>Object,nil}
Convert the event into a Ractor-shareable hash.
-
#update? ⇒ Boolean
True for update events.
Constructor Details
#initialize(operation:, schema:, table:, old_values: nil, new_values: nil, primary_key: nil, transaction_id: nil, commit_lsn: nil, sequence_number: nil, occurred_at: nil, metadata: {}) ⇒ ChangeEvent
Build a change event.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/cdc/core/change_event.rb', line 39 def initialize(operation:, schema:, table:, old_values: nil, new_values: nil, primary_key: nil, transaction_id: nil, commit_lsn: nil, sequence_number: nil, occurred_at: nil, metadata: {}) @operation = Operation.normalize(operation) @schema = String(schema).freeze @table = String(table).freeze @old_values = freeze_hash_or_nil(old_values) @new_values = freeze_hash_or_nil(new_values) @primary_key = freeze_hash_or_nil(primary_key) @transaction_id = transaction_id @commit_lsn = commit_lsn&.to_s&.freeze @sequence_number = sequence_number @occurred_at = occurred_at @metadata = .is_a?(EventMetadata) ? : EventMetadata.new() Ractor.make_shareable(self) end |
Instance Attribute Details
#commit_lsn ⇒ Symbol, ... (readonly)
23 24 25 |
# File 'lib/cdc/core/change_event.rb', line 23 def commit_lsn @commit_lsn end |
#metadata ⇒ Symbol, ... (readonly)
23 24 25 |
# File 'lib/cdc/core/change_event.rb', line 23 def @metadata end |
#new_values ⇒ Symbol, ... (readonly)
23 24 25 |
# File 'lib/cdc/core/change_event.rb', line 23 def new_values @new_values end |
#occurred_at ⇒ Symbol, ... (readonly)
23 24 25 |
# File 'lib/cdc/core/change_event.rb', line 23 def occurred_at @occurred_at end |
#old_values ⇒ Symbol, ... (readonly)
23 24 25 |
# File 'lib/cdc/core/change_event.rb', line 23 def old_values @old_values end |
#operation ⇒ Symbol, ... (readonly)
23 24 25 |
# File 'lib/cdc/core/change_event.rb', line 23 def operation @operation end |
#primary_key ⇒ Symbol, ... (readonly)
23 24 25 |
# File 'lib/cdc/core/change_event.rb', line 23 def primary_key @primary_key end |
#schema ⇒ Symbol, ... (readonly)
23 24 25 |
# File 'lib/cdc/core/change_event.rb', line 23 def schema @schema end |
#sequence_number ⇒ Symbol, ... (readonly)
23 24 25 |
# File 'lib/cdc/core/change_event.rb', line 23 def sequence_number @sequence_number end |
#table ⇒ Symbol, ... (readonly)
23 24 25 |
# File 'lib/cdc/core/change_event.rb', line 23 def table @table end |
#transaction_id ⇒ Symbol, ... (readonly)
23 24 25 |
# File 'lib/cdc/core/change_event.rb', line 23 def transaction_id @transaction_id end |
Instance Method Details
#changes ⇒ Array<ColumnChange>
Compute changed columns by comparing old and new values.
Columns with equal old and new values are omitted. Insert and delete events can pass nil for one side; missing values are represented as nil.
76 77 78 79 80 81 82 |
# File 'lib/cdc/core/change_event.rb', line 76 def changes keys = ((old_values || {}).keys | (new_values || {}).keys) keys.filter_map do |key| change = ColumnChange.new(name: key, old_value: old_values&.[](key), new_value: new_values&.[](key)) change if change.changed? end.then { |items| Ractor.make_shareable(items.freeze) } end |
#delete? ⇒ Boolean
Returns true for delete events.
63 |
# File 'lib/cdc/core/change_event.rb', line 63 def delete? = operation == Operation::DELETE |
#insert? ⇒ Boolean
Returns true for insert events.
57 |
# File 'lib/cdc/core/change_event.rb', line 57 def insert? = operation == Operation::INSERT |
#qualified_table_name ⇒ String
Fully qualified table name in schema.table form.
68 |
# File 'lib/cdc/core/change_event.rb', line 68 def qualified_table_name = "#{schema}.#{table}".freeze |
#to_h ⇒ Hash{String=>Object,nil}
Convert the event into a Ractor-shareable hash.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/cdc/core/change_event.rb', line 87 def to_h Ractor.make_shareable({ 'operation' => operation, 'schema' => schema, 'table' => table, 'old_values' => old_values, 'new_values' => new_values, 'primary_key' => primary_key, 'transaction_id' => transaction_id, 'commit_lsn' => commit_lsn, 'sequence_number' => sequence_number, 'occurred_at' => occurred_at, 'metadata' => .to_h }.freeze) end |