Class: CDC::SolidQueue::EventSerializer

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/solid_queue/event_serializer.rb

Overview

Converts CDC events into Solid Queue-safe payloads.

Payloads are plain hashes so Active Job can serialize them without needing to load the original event object in the queue database. rubocop:disable Metrics/ClassLength

Constant Summary collapse

INTERNAL_METADATA_KEY =

Reserved payload key for cdc-solid-queue enqueue metadata.

'_cdc_solid_queue'
ORDERING_VALUE_FETCHERS =

Lookup table for ordering value extraction by ordering key.

{
  identity: ->(payload) { payload['identity'] || payload['primary_key'] },
  primary_key: ->(payload) { payload['identity'] || payload['primary_key'] },
  relation: ->(payload) { [payload['namespace'] || payload['schema'], payload['entity'] || payload['table']] },
  transaction: ->(payload) { payload['transaction_id'] },
  global: ->(payload) { payload['source_position'] || payload['commit_lsn'] }
}.freeze

Class Method Summary collapse

Class Method Details

.dump(event) ⇒ Hash

Serialize an event-like object.

Parameters:

  • event (Object)

    event object or Hash

Returns:

  • (Hash)

    serializable event payload

Raises:



27
28
29
30
31
32
33
34
35
36
37
# File 'lib/cdc/solid_queue/event_serializer.rb', line 27

def self.dump(event)
  payload = if event.is_a?(Hash)
              event
            elsif event.respond_to?(:to_h)
              event.to_h
            else
              raise SerializationError, 'event must respond to to_h or be a Hash'
            end

  normalize_hash(payload)
end

.dump_batch(events) ⇒ Array<Hash>

Serialize a batch of event-like objects.

Parameters:

  • events (Array<Object>)

Returns:

  • (Array<Hash>)

Raises:



43
44
45
46
47
# File 'lib/cdc/solid_queue/event_serializer.rb', line 43

def self.dump_batch(events)
  raise SerializationError, 'events must be an Array' unless events.is_a?(Array)

  events.map { |event| dump(event) }
end

.enqueue_metadata(payload) ⇒ Hash

Return cdc-solid-queue metadata from an enqueued payload.

Parameters:

  • payload (Hash)

Returns:

  • (Hash)


103
104
105
106
107
108
109
# File 'lib/cdc/solid_queue/event_serializer.rb', line 103

def self.(payload)
  return (payload) if payload.is_a?(Array)

  normalized = normalize_hash(payload)
   = normalized[INTERNAL_METADATA_KEY]
  .is_a?(Hash) ?  : {}
end

.load(payload) ⇒ Hash

Load a serialized event payload.

Parameters:

  • payload (Hash)

Returns:

  • (Hash)

Raises:



54
55
56
57
58
# File 'lib/cdc/solid_queue/event_serializer.rb', line 54

def self.load(payload)
  raise SerializationError, 'payload must be a Hash' unless payload.is_a?(Hash)

  (normalize_hash(payload))
end

.load_batch(payloads) ⇒ Array<Hash>

Load a batch of serialized event payloads.

Parameters:

  • payloads (Array<Hash>)

Returns:

  • (Array<Hash>)

Raises:



64
65
66
67
68
# File 'lib/cdc/solid_queue/event_serializer.rb', line 64

def self.load_batch(payloads)
  raise SerializationError, 'payloads must be an Array' unless payloads.is_a?(Array)

  payloads.map { |payload| load(payload) }
end

.load_event(payload) ⇒ CDC::Core::ChangeEvent, Hash

Load a serialized event payload into a CDC event when possible.

Parameters:

  • payload (Hash)

Returns:

  • (CDC::Core::ChangeEvent, Hash)


74
75
76
77
78
79
80
81
# File 'lib/cdc/solid_queue/event_serializer.rb', line 74

def self.load_event(payload)
  return load_batch(payload).map { |item| load_event(item) } if payload.is_a?(Array)

  normalized = load(payload)
  return normalized unless change_event_payload?(normalized)

  build_change_event(normalized)
end

.ordering_value(payload, key) ⇒ Object?

Return the ordering value for a serialized event.

Parameters:

  • payload (Hash)
  • key (Symbol)

Returns:

  • (Object, nil)


116
117
118
119
120
121
122
123
124
# File 'lib/cdc/solid_queue/event_serializer.rb', line 116

def self.ordering_value(payload, key)
  return payload.map { |item| ordering_value(item, key) } if payload.is_a?(Array)
  return nil if key == :none

  fetcher = ORDERING_VALUE_FETCHERS[key]
  return nil unless fetcher

  fetcher.call(load(payload))
end

.with_enqueue_metadata(payload, metadata) ⇒ Hash

Attach enqueue metadata without changing the event representation.

Parameters:

  • payload (Hash)
  • metadata (Hash)

Returns:

  • (Hash)


88
89
90
91
92
93
94
95
96
97
# File 'lib/cdc/solid_queue/event_serializer.rb', line 88

def self.(payload, )
  if payload.is_a?(Array)
    return payload.each_with_index.map do |child, index|
      (child, (, index))
    end
  end

  normalized = normalize_hash(payload)
  normalized.merge(INTERNAL_METADATA_KEY => normalize_hash())
end