Module: Acta::Upcaster

Defined in:
lib/acta/upcaster.rb

Overview

Replay-time event transformation. Apps declare upcasters when an event type’s shape changes between schema versions; the pipeline transforms stored records on read so projections see them at the latest shape. See ‘docs/upcasters.md` for the end-to-end recipe.

module Scaff
  class WorkspaceMigrationUpcasters
    include Acta::Upcaster

    upcasts "Scaff::ItemCreated", from: 1, to: 2 do |event, context|
      payload = event.payload
      if payload["item_type"] == "goal"
        context[:goal_to_workspace][payload["item_id"]] = payload["item_id"]
        event.upcast_to(
          type: "Scaff::WorkspaceCreated",
          payload: { "workspace_id" => payload["item_id"], "title" => payload["title"] },
          schema_version: 2
        )
      else
        event.upcast_to(payload: payload.merge("workspace_id" => "..."), schema_version: 2)
      end
    end
  end
end

Acta.register_upcaster(Scaff::WorkspaceMigrationUpcasters)

Upcasters run pre-hydration during every read (‘Acta.rebuild!`, `ReactorJob#perform`, the events admin, test fixtures) — apps can safely delete an old event class once a rename upcaster is in place. The live emit path is exempt: emitted events carry the current code’s ‘event_version` and are dispatched in-memory before any read happens.

Defined Under Namespace

Modules: ClassMethods Classes: Context, Registry, View

Constant Summary collapse

NO_OP =

Identity sentinel — ‘upcasts “Foo”, from: N, to: N, &Acta::Upcaster::NO_OP` declares the post-migration record at version N as a no-op pass-through (e.g. a `GoalPromotedToWorkspace` event whose effect is already produced by upcasting earlier events).

lambda { |event, _context| event }.freeze

Class Method Summary collapse

Class Method Details

.included(base) ⇒ Object



43
44
45
# File 'lib/acta/upcaster.rb', line 43

def self.included(base)
  base.extend(ClassMethods)
end

.upcast(record, context, registry: Acta.upcaster_registry) ⇒ Object

Walk a record through every matching upcaster, returning 0..N upcasted records. Identity when no upcaster matches. Handles:

- chain:        block returns a single record → loop continues at new (event_type, event_version)
- 1-to-many:    block returns an array       → each branch recurses (so chaining + fan-out compose)
- drop:         block returns nil or []      → record produces no projection input
- fail:         block calls `context.fail_replay!` → halts with `ReplayHaltedByUpcaster`
- future ver:   stored event_version exceeds anything we can reach → `FutureSchemaVersion`


190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/acta/upcaster.rb', line 190

def self.upcast(record, context, registry: Acta.upcaster_registry)
  origin = record.respond_to?(:base) ? record.base : record
  current = record.is_a?(View) ? record : View.new(record)
  return [ current ] if registry.empty?

  loop do
    reg = registry.find(current.event_type, current.event_version)

    unless reg
      known_max = registry.latest_for(current.event_type)
      if known_max.positive? && current.event_version > known_max
        raise FutureSchemaVersion.new(record: origin, latest_known_version: known_max)
      end

      break
    end

    result = begin
      reg[:block].call(current, context)
    rescue Context::FailReplay => e
      raise ReplayHaltedByUpcaster.new(record: origin, reason: e.message)
    end

    return [] if result.nil? || (result.is_a?(Array) && result.empty?)

    if result.is_a?(Array)
      return result.flat_map { |branch| upcast(branch, context, registry: registry) }
    end

    unless result.is_a?(View)
      raise UpcasterRegistryError,
            "Upcaster #{reg[:owner].name} for #{current.event_type} v#{current.event_version} " \
            "returned #{result.class} — expected an Acta::Upcaster::View " \
            "(use `event.upcast_to(...)` to produce one)."
    end

    if result.event_version == current.event_version && result.event_type == current.event_type
      # Identity at the current version (e.g. NO_OP). Stop the loop —
      # otherwise we'd recurse forever on the same (type, version) key.
      current = result
      break
    end

    current = result
  end

  [ current ]
end