Module: Acta::Upcaster
- Defined in:
- lib/acta/upcaster.rb
Overview
Replay-time event transformation. Apps declare upcasters when an event type’s shape changes between schema versions; the pipeline transforms stored records on read so projections see them at the latest shape. See ‘docs/upcasters.md` for the end-to-end recipe.
module Scaff
class WorkspaceMigrationUpcasters
include Acta::Upcaster
upcasts "Scaff::ItemCreated", from: 1, to: 2 do |event, context|
payload = event.payload
if payload["item_type"] == "goal"
context[:goal_to_workspace][payload["item_id"]] = payload["item_id"]
event.upcast_to(
type: "Scaff::WorkspaceCreated",
payload: { "workspace_id" => payload["item_id"], "title" => payload["title"] },
schema_version: 2
)
else
event.upcast_to(payload: payload.merge("workspace_id" => "..."), schema_version: 2)
end
end
end
end
Acta.register_upcaster(Scaff::WorkspaceMigrationUpcasters)
Upcasters run pre-hydration during every read (‘Acta.rebuild!`, `ReactorJob#perform`, the events admin, test fixtures) — apps can safely delete an old event class once a rename upcaster is in place. The live emit path is exempt: emitted events carry the current code’s ‘event_version` and are dispatched in-memory before any read happens.
Defined Under Namespace
Modules: ClassMethods Classes: Context, Registry, View
Constant Summary collapse
- NO_OP =
Identity sentinel — ‘upcasts “Foo”, from: N, to: N, &Acta::Upcaster::NO_OP` declares the post-migration record at version N as a no-op pass-through (e.g. a `GoalPromotedToWorkspace` event whose effect is already produced by upcasting earlier events).
lambda { |event, _context| event }.freeze
Class Method Summary collapse
- .included(base) ⇒ Object
-
.upcast(record, context, registry: Acta.upcaster_registry) ⇒ Object
Walk a record through every matching upcaster, returning 0..N upcasted records.
Class Method Details
.included(base) ⇒ Object
43 44 45 |
# File 'lib/acta/upcaster.rb', line 43 def self.included(base) base.extend(ClassMethods) end |
.upcast(record, context, registry: Acta.upcaster_registry) ⇒ Object
Walk a record through every matching upcaster, returning 0..N upcasted records. Identity when no upcaster matches. Handles:
- chain: block returns a single record → loop continues at new (event_type, event_version)
- 1-to-many: block returns an array → each branch recurses (so chaining + fan-out compose)
- drop: block returns nil or [] → record produces no projection input
- fail: block calls `context.fail_replay!` → halts with `ReplayHaltedByUpcaster`
- future ver: stored event_version exceeds anything we can reach → `FutureSchemaVersion`
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/acta/upcaster.rb', line 190 def self.upcast(record, context, registry: Acta.upcaster_registry) origin = record.respond_to?(:base) ? record.base : record current = record.is_a?(View) ? record : View.new(record) return [ current ] if registry.empty? loop do reg = registry.find(current.event_type, current.event_version) unless reg known_max = registry.latest_for(current.event_type) if known_max.positive? && current.event_version > known_max raise FutureSchemaVersion.new(record: origin, latest_known_version: known_max) end break end result = begin reg[:block].call(current, context) rescue Context::FailReplay => e raise ReplayHaltedByUpcaster.new(record: origin, reason: e.) end return [] if result.nil? || (result.is_a?(Array) && result.empty?) if result.is_a?(Array) return result.flat_map { |branch| upcast(branch, context, registry: registry) } end unless result.is_a?(View) raise UpcasterRegistryError, "Upcaster #{reg[:owner].name} for #{current.event_type} v#{current.event_version} " \ "returned #{result.class} — expected an Acta::Upcaster::View " \ "(use `event.upcast_to(...)` to produce one)." end if result.event_version == current.event_version && result.event_type == current.event_type # Identity at the current version (e.g. NO_OP). Stop the loop — # otherwise we'd recurse forever on the same (type, version) key. current = result break end current = result end [ current ] end |