Class: CDC::SolidQueue::EventSerializer
- Inherits:
-
Object
- Object
- CDC::SolidQueue::EventSerializer
- Defined in:
- lib/cdc/solid_queue/event_serializer.rb
Overview
Converts CDC events into Solid Queue-safe payloads.
Payloads are plain hashes so Active Job can serialize them without needing to load the original event object in the queue database.
Constant Summary collapse
- INTERNAL_METADATA_KEY =
Reserved payload key for cdc-solid-queue enqueue metadata.
'_cdc_solid_queue'
Class Method Summary collapse
-
.dump(event) ⇒ Hash
Serialize an event-like object.
-
.enqueue_metadata(payload) ⇒ Hash
Return cdc-solid-queue metadata from an enqueued payload.
-
.load(payload) ⇒ Hash
Load a serialized event payload.
-
.load_event(payload) ⇒ CDC::Core::ChangeEvent, Hash
Load a serialized event payload into a CDC event when possible.
-
.ordering_value(payload, key) ⇒ Object?
Return the ordering value for a serialized event.
-
.with_enqueue_metadata(payload, metadata) ⇒ Hash
Attach enqueue metadata without changing the event representation.
Class Method Details
.dump(event) ⇒ Hash
Serialize an event-like object.
18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/cdc/solid_queue/event_serializer.rb', line 18 def self.dump(event) payload = if event.is_a?(Hash) event elsif event.respond_to?(:to_h) event.to_h else raise SerializationError, 'event must respond to to_h or be a Hash' end normalize_hash(payload) end |
.enqueue_metadata(payload) ⇒ Hash
Return cdc-solid-queue metadata from an enqueued payload.
66 67 68 69 70 |
# File 'lib/cdc/solid_queue/event_serializer.rb', line 66 def self.(payload) normalized = normalize_hash(payload) = normalized[INTERNAL_METADATA_KEY] .is_a?(Hash) ? : {} end |
.load(payload) ⇒ Hash
Load a serialized event payload.
35 36 37 38 39 |
# File 'lib/cdc/solid_queue/event_serializer.rb', line 35 def self.load(payload) raise SerializationError, 'payload must be a Hash' unless payload.is_a?(Hash) (normalize_hash(payload)) end |
.load_event(payload) ⇒ CDC::Core::ChangeEvent, Hash
Load a serialized event payload into a CDC event when possible.
45 46 47 48 49 50 |
# File 'lib/cdc/solid_queue/event_serializer.rb', line 45 def self.load_event(payload) normalized = load(payload) return normalized unless change_event_payload?(normalized) build_change_event(normalized) end |
.ordering_value(payload, key) ⇒ Object?
Return the ordering value for a serialized event.
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/cdc/solid_queue/event_serializer.rb', line 77 def self.ordering_value(payload, key) normalized = load(payload) case key when :identity, :primary_key normalized['identity'] || normalized['primary_key'] when :relation [normalized['namespace'] || normalized['schema'], normalized['entity'] || normalized['table']] when :transaction normalized['transaction_id'] when :global normalized['source_position'] || normalized['commit_lsn'] when :none nil end end |
.with_enqueue_metadata(payload, metadata) ⇒ Hash
Attach enqueue metadata without changing the event representation.
57 58 59 60 |
# File 'lib/cdc/solid_queue/event_serializer.rb', line 57 def self.(payload, ) normalized = normalize_hash(payload) normalized.merge(INTERNAL_METADATA_KEY => normalize_hash()) end |