Class: Yes::Core::CommandHandling::CommandExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/yes/core/command_handling/command_executor.rb

Overview

Executes commands with retry logic and pending state management Handles the core command execution including guard evaluation, event publishing, and error handling with optimistic concurrency control

Examples:

executor = CommandExecutor.new(aggregate)
response = executor.call(command, guard_evaluator_class)

Constant Summary collapse

MAX_RETRIES =
10
INLINE_RECOVERY_RETRY_THRESHOLD =
5

Instance Method Summary collapse

Constructor Details

#initialize(aggregate) ⇒ CommandExecutor

Initializes a new CommandExecutor

Parameters:



64
65
66
67
# File 'lib/yes/core/command_handling/command_executor.rb', line 64

def initialize(aggregate)
  @aggregate = aggregate
  @read_model = aggregate.read_model if aggregate.class.read_model_enabled?
end

Instance Method Details

#call(cmd, command_name, guard_evaluator_class, skip_guards: false) ⇒ Yes::Core::Commands::Response

Executes a command with retry logic and error handling

Parameters:

  • cmd (Yes::Core::Command)

    The command to execute

  • command_name (Symbol)

    The name of the command being executed

  • guard_evaluator_class (Class)

    The guard evaluator class to process the command

  • skip_guards (Boolean) (defaults to: false)

    Whether to skip guard evaluation (default: false)

Returns:



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/yes/core/command_handling/command_executor.rb', line 76

def call(cmd, command_name, guard_evaluator_class, skip_guards: false)
  retries = 0

  begin
    evaluator = GuardRunner.new(aggregate).call(cmd, command_name, guard_evaluator_class, skip_guards:)

    set_pending_update_state if aggregate.class.read_model_enabled?

    begin
      event = EventPublisher.new(
        command: cmd,
        aggregate_data: EventPublisher::AggregateEventPublicationData.from_aggregate(aggregate),
        accessed_external_aggregates: evaluator&.accessed_external_aggregates || []
      ).call
    rescue StandardError => e
      clear_pending_update_state if aggregate.class.read_model_enabled?
      raise e
    end

    command_response_class(cmd).new(cmd:, event:)
  rescue PgEventstore::WrongExpectedRevisionError => e
    retries += 1
    clear_pending_update_state if aggregate.class.read_model_enabled?

    retries <= MAX_RETRIES ? retry : raise(e)
  rescue ConcurrentUpdateError => e
    retries += 1
    # Don't clear pending state - another process owns it
    # Sleep with exponential backoff to give the other process time to finish
    sleep([0.01 * (2**(retries - 1)), 1.0].min) if retries <= MAX_RETRIES

    # After several retries, check if pending state is stuck and attempt recovery
    # This prevents infinite retry loops when a process crashes leaving the flag set
    if aggregate.class.read_model_enabled? && retries >= INLINE_RECOVERY_RETRY_THRESHOLD
      ReadModelRecoveryService.attempt_inline_recovery(read_model, aggregate: aggregate)
      read_model.reload
    end

    retries <= MAX_RETRIES ? retry : raise(e)
  rescue GuardEvaluator::InvalidTransition,
         GuardEvaluator::NoChangeTransition,
         Yes::Core::Command::Invalid => e
    command_response_class(cmd).new(cmd: cmd, error: e, batch_id: cmd.batch_id)
  end
end