Class: Yes::Core::CommandHandling::CommandGroupExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/yes/core/command_handling/command_group_executor.rb

Overview

Low-level executor for command groups.

Mirrors CommandExecutor, with two key differences:

  1. Sub-command events publish inside a single ‘PgEventstore.client.multiple` block, so either all events commit atomically or none do.

  2. Read-model updates run AFTER the eventstore commit succeeds, in declaration order, so a rolled-back transaction never leaves the read model ahead of the stream.

Only the group’s own guards run here — sub-command guards are bypassed by design.

Constant Summary collapse

MAX_RETRIES =
10
INLINE_RECOVERY_RETRY_THRESHOLD =
5

Instance Method Summary collapse

Constructor Details

#initialize(aggregate) ⇒ CommandGroupExecutor

Returns a new instance of CommandGroupExecutor.

Parameters:



24
25
26
27
# File 'lib/yes/core/command_handling/command_group_executor.rb', line 24

def initialize(aggregate)
  @aggregate = aggregate
  @read_model = aggregate.read_model if aggregate.class.read_model_enabled?
end

Instance Method Details

#call(cmd, group_name, guard_evaluator_class, skip_guards: false) ⇒ Yes::Core::Commands::CommandGroupResponse

Parameters:

Returns:



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/yes/core/command_handling/command_group_executor.rb', line 34

def call(cmd, group_name, guard_evaluator_class, skip_guards: false)
  retries = 0

  begin
    evaluator = GuardRunner.new(aggregate).call(cmd, group_name, guard_evaluator_class, skip_guards:)
    external_aggregates = evaluator&.accessed_external_aggregates || []

    set_pending_update_state if aggregate.class.read_model_enabled?

    events = publish_events(cmd, external_aggregates)
    apply_read_model_updates(cmd, events) if aggregate.class.read_model_enabled?

    Yes::Core::Commands::CommandGroupResponse.new(cmd:, events:)
  rescue PgEventstore::WrongExpectedRevisionError => e
    retries += 1
    clear_pending_update_state if aggregate.class.read_model_enabled?
    retries <= MAX_RETRIES ? retry : raise(e)
  rescue ConcurrentUpdateError => e
    retries += 1
    sleep([0.01 * (2**(retries - 1)), 1.0].min) if retries <= MAX_RETRIES

    if aggregate.class.read_model_enabled? && retries >= INLINE_RECOVERY_RETRY_THRESHOLD
      ReadModelRecoveryService.attempt_inline_recovery(read_model, aggregate: aggregate)
      read_model.reload
    end

    retries <= MAX_RETRIES ? retry : raise(e)
  rescue GuardEvaluator::InvalidTransition,
         GuardEvaluator::NoChangeTransition,
         Yes::Core::Command::Invalid => e
    clear_pending_update_state if aggregate.class.read_model_enabled?
    Yes::Core::Commands::CommandGroupResponse.new(cmd:, error: e)
  end
end