Class: CDC::Core::ChangeEvent

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/core/change_event.rb

Overview

Immutable representation of one logical database change.

ChangeEvent is the core data structure passed through filters, pipelines, and processors. It is database-agnostic but carries common CDC fields such as operation, schema, table, before/after values, primary key, LSN, and metadata.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(operation:, schema:, table:, old_values: nil, new_values: nil, primary_key: nil, transaction_id: nil, commit_lsn: nil, sequence_number: nil, occurred_at: nil, metadata: {}) ⇒ ChangeEvent

Build a change event.

Parameters:

  • operation (#to_sym)

    CDC operation

  • schema (#to_s)

    schema name

  • table (#to_s)

    table name

  • old_values (Hash, nil) (defaults to: nil)

    values before the change

  • new_values (Hash, nil) (defaults to: nil)

    values after the change

  • primary_key (Hash, nil) (defaults to: nil)

    primary key values

  • transaction_id (Object, nil) (defaults to: nil)

    source transaction identifier

  • commit_lsn (#to_s, nil) (defaults to: nil)

    commit log sequence number

  • sequence_number (Integer, nil) (defaults to: nil)

    event sequence number

  • occurred_at (Time, nil) (defaults to: nil)

    event timestamp

  • metadata (Hash, EventMetadata) (defaults to: {})

    additional event metadata



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/cdc/core/change_event.rb', line 39

def initialize(operation:, schema:, table:, old_values: nil, new_values: nil, primary_key: nil,
               transaction_id: nil, commit_lsn: nil, sequence_number: nil, occurred_at: nil,
               metadata: {})
  @operation = Operation.normalize(operation)
  @schema = String(schema).freeze
  @table = String(table).freeze
  @old_values = freeze_hash_or_nil(old_values)
  @new_values = freeze_hash_or_nil(new_values)
  @primary_key = freeze_hash_or_nil(primary_key)
  @transaction_id = transaction_id
  @commit_lsn = commit_lsn&.to_s&.freeze
  @sequence_number = sequence_number
  @occurred_at = occurred_at
  @metadata = .is_a?(EventMetadata) ?  : EventMetadata.new()
  Ractor.make_shareable(self)
end

Instance Attribute Details

#commit_lsnSymbol, ... (readonly)

Returns:

  • (Symbol)

    normalized CDC operation

  • (String)

    database schema name

  • (String)

    database table name

  • (Hash, nil)

    values before the change

  • (Hash, nil)

    values after the change

  • (Hash, nil)

    primary key values for the changed row

  • (Object, nil)

    transaction identifier from the upstream source

  • (String, nil)

    commit log sequence number

  • (Integer, nil)

    event sequence within a transaction or stream

  • (Time, nil)

    timestamp associated with the event

  • (EventMetadata)

    additional normalized metadata



23
24
25
# File 'lib/cdc/core/change_event.rb', line 23

def commit_lsn
  @commit_lsn
end

#metadataSymbol, ... (readonly)

Returns:

  • (Symbol)

    normalized CDC operation

  • (String)

    database schema name

  • (String)

    database table name

  • (Hash, nil)

    values before the change

  • (Hash, nil)

    values after the change

  • (Hash, nil)

    primary key values for the changed row

  • (Object, nil)

    transaction identifier from the upstream source

  • (String, nil)

    commit log sequence number

  • (Integer, nil)

    event sequence within a transaction or stream

  • (Time, nil)

    timestamp associated with the event

  • (EventMetadata)

    additional normalized metadata



23
24
25
# File 'lib/cdc/core/change_event.rb', line 23

def 
  @metadata
end

#new_valuesSymbol, ... (readonly)

Returns:

  • (Symbol)

    normalized CDC operation

  • (String)

    database schema name

  • (String)

    database table name

  • (Hash, nil)

    values before the change

  • (Hash, nil)

    values after the change

  • (Hash, nil)

    primary key values for the changed row

  • (Object, nil)

    transaction identifier from the upstream source

  • (String, nil)

    commit log sequence number

  • (Integer, nil)

    event sequence within a transaction or stream

  • (Time, nil)

    timestamp associated with the event

  • (EventMetadata)

    additional normalized metadata



23
24
25
# File 'lib/cdc/core/change_event.rb', line 23

def new_values
  @new_values
end

#occurred_atSymbol, ... (readonly)

Returns:

  • (Symbol)

    normalized CDC operation

  • (String)

    database schema name

  • (String)

    database table name

  • (Hash, nil)

    values before the change

  • (Hash, nil)

    values after the change

  • (Hash, nil)

    primary key values for the changed row

  • (Object, nil)

    transaction identifier from the upstream source

  • (String, nil)

    commit log sequence number

  • (Integer, nil)

    event sequence within a transaction or stream

  • (Time, nil)

    timestamp associated with the event

  • (EventMetadata)

    additional normalized metadata



23
24
25
# File 'lib/cdc/core/change_event.rb', line 23

def occurred_at
  @occurred_at
end

#old_valuesSymbol, ... (readonly)

Returns:

  • (Symbol)

    normalized CDC operation

  • (String)

    database schema name

  • (String)

    database table name

  • (Hash, nil)

    values before the change

  • (Hash, nil)

    values after the change

  • (Hash, nil)

    primary key values for the changed row

  • (Object, nil)

    transaction identifier from the upstream source

  • (String, nil)

    commit log sequence number

  • (Integer, nil)

    event sequence within a transaction or stream

  • (Time, nil)

    timestamp associated with the event

  • (EventMetadata)

    additional normalized metadata



23
24
25
# File 'lib/cdc/core/change_event.rb', line 23

def old_values
  @old_values
end

#operationSymbol, ... (readonly)

Returns:

  • (Symbol)

    normalized CDC operation

  • (String)

    database schema name

  • (String)

    database table name

  • (Hash, nil)

    values before the change

  • (Hash, nil)

    values after the change

  • (Hash, nil)

    primary key values for the changed row

  • (Object, nil)

    transaction identifier from the upstream source

  • (String, nil)

    commit log sequence number

  • (Integer, nil)

    event sequence within a transaction or stream

  • (Time, nil)

    timestamp associated with the event

  • (EventMetadata)

    additional normalized metadata



23
24
25
# File 'lib/cdc/core/change_event.rb', line 23

def operation
  @operation
end

#primary_keySymbol, ... (readonly)

Returns:

  • (Symbol)

    normalized CDC operation

  • (String)

    database schema name

  • (String)

    database table name

  • (Hash, nil)

    values before the change

  • (Hash, nil)

    values after the change

  • (Hash, nil)

    primary key values for the changed row

  • (Object, nil)

    transaction identifier from the upstream source

  • (String, nil)

    commit log sequence number

  • (Integer, nil)

    event sequence within a transaction or stream

  • (Time, nil)

    timestamp associated with the event

  • (EventMetadata)

    additional normalized metadata



23
24
25
# File 'lib/cdc/core/change_event.rb', line 23

def primary_key
  @primary_key
end

#schemaSymbol, ... (readonly)

Returns:

  • (Symbol)

    normalized CDC operation

  • (String)

    database schema name

  • (String)

    database table name

  • (Hash, nil)

    values before the change

  • (Hash, nil)

    values after the change

  • (Hash, nil)

    primary key values for the changed row

  • (Object, nil)

    transaction identifier from the upstream source

  • (String, nil)

    commit log sequence number

  • (Integer, nil)

    event sequence within a transaction or stream

  • (Time, nil)

    timestamp associated with the event

  • (EventMetadata)

    additional normalized metadata



23
24
25
# File 'lib/cdc/core/change_event.rb', line 23

def schema
  @schema
end

#sequence_numberSymbol, ... (readonly)

Returns:

  • (Symbol)

    normalized CDC operation

  • (String)

    database schema name

  • (String)

    database table name

  • (Hash, nil)

    values before the change

  • (Hash, nil)

    values after the change

  • (Hash, nil)

    primary key values for the changed row

  • (Object, nil)

    transaction identifier from the upstream source

  • (String, nil)

    commit log sequence number

  • (Integer, nil)

    event sequence within a transaction or stream

  • (Time, nil)

    timestamp associated with the event

  • (EventMetadata)

    additional normalized metadata



23
24
25
# File 'lib/cdc/core/change_event.rb', line 23

def sequence_number
  @sequence_number
end

#tableSymbol, ... (readonly)

Returns:

  • (Symbol)

    normalized CDC operation

  • (String)

    database schema name

  • (String)

    database table name

  • (Hash, nil)

    values before the change

  • (Hash, nil)

    values after the change

  • (Hash, nil)

    primary key values for the changed row

  • (Object, nil)

    transaction identifier from the upstream source

  • (String, nil)

    commit log sequence number

  • (Integer, nil)

    event sequence within a transaction or stream

  • (Time, nil)

    timestamp associated with the event

  • (EventMetadata)

    additional normalized metadata



23
24
25
# File 'lib/cdc/core/change_event.rb', line 23

def table
  @table
end

#transaction_idSymbol, ... (readonly)

Returns:

  • (Symbol)

    normalized CDC operation

  • (String)

    database schema name

  • (String)

    database table name

  • (Hash, nil)

    values before the change

  • (Hash, nil)

    values after the change

  • (Hash, nil)

    primary key values for the changed row

  • (Object, nil)

    transaction identifier from the upstream source

  • (String, nil)

    commit log sequence number

  • (Integer, nil)

    event sequence within a transaction or stream

  • (Time, nil)

    timestamp associated with the event

  • (EventMetadata)

    additional normalized metadata



23
24
25
# File 'lib/cdc/core/change_event.rb', line 23

def transaction_id
  @transaction_id
end

Instance Method Details

#changesArray<ColumnChange>

Compute changed columns by comparing old and new values.

Columns with equal old and new values are omitted. Insert and delete events can pass nil for one side; missing values are represented as nil.

Returns:

  • (Array<ColumnChange>)

    Ractor-shareable changed columns



76
77
78
79
80
81
82
# File 'lib/cdc/core/change_event.rb', line 76

def changes
  keys = ((old_values || {}).keys | (new_values || {}).keys)
  keys.filter_map do |key|
    change = ColumnChange.new(name: key, old_value: old_values&.[](key), new_value: new_values&.[](key))
    change if change.changed?
  end.then { |items| Ractor.make_shareable(items.freeze) }
end

#delete?Boolean

Returns true for delete events.

Returns:

  • (Boolean)

    true for delete events



63
# File 'lib/cdc/core/change_event.rb', line 63

def delete? = operation == Operation::DELETE

#insert?Boolean

Returns true for insert events.

Returns:

  • (Boolean)

    true for insert events



57
# File 'lib/cdc/core/change_event.rb', line 57

def insert? = operation == Operation::INSERT

#qualified_table_nameString

Fully qualified table name in schema.table form.

Returns:

  • (String)


68
# File 'lib/cdc/core/change_event.rb', line 68

def qualified_table_name = "#{schema}.#{table}".freeze

#to_hHash{String=>Object,nil}

Convert the event into a Ractor-shareable hash.

Returns:

  • (Hash{String=>Object,nil})


87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/cdc/core/change_event.rb', line 87

def to_h
  Ractor.make_shareable({
    'operation' => operation,
    'schema' => schema,
    'table' => table,
    'old_values' => old_values,
    'new_values' => new_values,
    'primary_key' => primary_key,
    'transaction_id' => transaction_id,
    'commit_lsn' => commit_lsn,
    'sequence_number' => sequence_number,
    'occurred_at' => occurred_at,
    'metadata' => .to_h
  }.freeze)
end

#update?Boolean

Returns true for update events.

Returns:

  • (Boolean)

    true for update events



60
# File 'lib/cdc/core/change_event.rb', line 60

def update? = operation == Operation::UPDATE