Class: CDC::SolidQueue::EventSerializer

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/solid_queue/event_serializer.rb

Overview

Converts CDC events into Solid Queue-safe payloads.

Payloads are plain hashes so Active Job can serialize them without needing to load the original event object in the queue database.

Constant Summary collapse

INTERNAL_METADATA_KEY =

Reserved payload key for cdc-solid-queue enqueue metadata.

'_cdc_solid_queue'

Class Method Summary collapse

Class Method Details

.dump(event) ⇒ Hash

Serialize an event-like object.

Parameters:

  • event (Object)

    event object or Hash

Returns:

  • (Hash)

    serializable event payload

Raises:



18
19
20
21
22
23
24
25
26
27
28
# File 'lib/cdc/solid_queue/event_serializer.rb', line 18

def self.dump(event)
  payload = if event.is_a?(Hash)
              event
            elsif event.respond_to?(:to_h)
              event.to_h
            else
              raise SerializationError, 'event must respond to to_h or be a Hash'
            end

  normalize_hash(payload)
end

.enqueue_metadata(payload) ⇒ Hash

Return cdc-solid-queue metadata from an enqueued payload.

Parameters:

  • payload (Hash)

Returns:

  • (Hash)


66
67
68
69
70
# File 'lib/cdc/solid_queue/event_serializer.rb', line 66

def self.(payload)
  normalized = normalize_hash(payload)
   = normalized[INTERNAL_METADATA_KEY]
  .is_a?(Hash) ?  : {}
end

.load(payload) ⇒ Hash

Load a serialized event payload.

Parameters:

  • payload (Hash)

Returns:

  • (Hash)

Raises:



35
36
37
38
39
# File 'lib/cdc/solid_queue/event_serializer.rb', line 35

def self.load(payload)
  raise SerializationError, 'payload must be a Hash' unless payload.is_a?(Hash)

  (normalize_hash(payload))
end

.load_event(payload) ⇒ CDC::Core::ChangeEvent, Hash

Load a serialized event payload into a CDC event when possible.

Parameters:

  • payload (Hash)

Returns:

  • (CDC::Core::ChangeEvent, Hash)


45
46
47
48
49
50
# File 'lib/cdc/solid_queue/event_serializer.rb', line 45

def self.load_event(payload)
  normalized = load(payload)
  return normalized unless change_event_payload?(normalized)

  build_change_event(normalized)
end

.ordering_value(payload, key) ⇒ Object?

Return the ordering value for a serialized event.

Parameters:

  • payload (Hash)
  • key (Symbol)

Returns:

  • (Object, nil)


77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/cdc/solid_queue/event_serializer.rb', line 77

def self.ordering_value(payload, key)
  normalized = load(payload)
  case key
  when :identity, :primary_key
    normalized['identity'] || normalized['primary_key']
  when :relation
    [normalized['namespace'] || normalized['schema'], normalized['entity'] || normalized['table']]
  when :transaction
    normalized['transaction_id']
  when :global
    normalized['source_position'] || normalized['commit_lsn']
  when :none
    nil
  end
end

.with_enqueue_metadata(payload, metadata) ⇒ Hash

Attach enqueue metadata without changing the event representation.

Parameters:

  • payload (Hash)
  • metadata (Hash)

Returns:

  • (Hash)


57
58
59
60
# File 'lib/cdc/solid_queue/event_serializer.rb', line 57

def self.(payload, )
  normalized = normalize_hash(payload)
  normalized.merge(INTERNAL_METADATA_KEY => normalize_hash())
end