Class: Yes::Core::CommandHandling::ReadModelUpdater

Inherits:
Object
  • Object
show all
Includes:
OpenTelemetry::Trackable
Defined in:
lib/yes/core/command_handling/read_model_updater.rb

Overview

Updates read models with revision guard to ensure consistency Handles state updater instantiation and execution within a revision-protected context

Examples:

updater = ReadModelUpdater.new(aggregate)
updater.call(event, command_payload, :approve_documents)

Instance Method Summary collapse

Constructor Details

#initialize(aggregate) ⇒ ReadModelUpdater

Initializes a new ReadModelUpdater

Parameters:



19
20
21
22
23
24
# File 'lib/yes/core/command_handling/read_model_updater.rb', line 19

def initialize(aggregate)
  @aggregate = aggregate
  @read_model = aggregate.read_model if aggregate.class.read_model_enabled?
  @command_utilities = aggregate.send(:command_utilities)
  @revision_column = aggregate.send(:revision_column) if aggregate.class.read_model_enabled?
end

Instance Method Details

#call(event, command_payload = nil, command_name = nil, resolve_payload: false) ⇒ void

This method returns an undefined value.

Updates the read model with revision guard protection

Parameters:

  • event (Yes::Core::Event)

    The event that was published

  • command_payload (Hash) (defaults to: nil)

    The command payload

  • command_name (Symbol, String) (defaults to: nil)

    The command name (optional, will be derived from event if not provided)



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/yes/core/command_handling/read_model_updater.rb', line 32

def call(event, command_payload = nil, command_name = nil, resolve_payload: false)
  return unless aggregate.class.read_model_enabled?

  begin
    command_name ||= command_utilities.command_name_from_event(event, aggregate.class)
  rescue Yes::Core::Utils::CommandUtils::CommandNotFoundError => e
    Rails.logger.warn("Command not found for event #{event.type}: #{e.message}")

    # update revision only in case event is unknown to aggregate
    return update_revision(event.stream_revision)
  end

  payload = command_payload || payload_from_event(event, resolve_payload)

  locale = payload[:locale]

  state_updater_class = command_utilities.fetch_state_updater_class(command_name)

  ReadModelRevisionGuard.call(
    read_model,
    event.stream_revision,
    revision_column:
  ) do
    state_changes = state_updater_class.new(
      payload: payload.except(*Yes::Core::Command::RESERVED_KEYS),
      aggregate:,
      event:
    ).call
    aggregate.update_read_model(
      state_changes.merge(
        revision_column => event.stream_revision,
        locale:,
        pending_update_since: nil
      )
    )
  end
rescue ReadModelRevisionGuard::RevisionAlreadyAppliedError => e
  Rails.logger.warn("Read model revision already applied: #{e.message}")
end