Class: Yes::Core::CommandHandling::CommandExecutor
- Inherits:
-
Object
- Object
- Yes::Core::CommandHandling::CommandExecutor
- Defined in:
- lib/yes/core/command_handling/command_executor.rb
Overview
Executes commands with retry logic and pending state management Handles the core command execution including guard evaluation, event publishing, and error handling with optimistic concurrency control
Constant Summary collapse
- MAX_RETRIES =
10- INLINE_RECOVERY_RETRY_THRESHOLD =
5
Instance Method Summary collapse
-
#call(cmd, command_name, guard_evaluator_class, skip_guards: false) ⇒ Yes::Core::Commands::Response
Executes a command with retry logic and error handling.
-
#initialize(aggregate) ⇒ CommandExecutor
constructor
Initializes a new CommandExecutor.
Constructor Details
#initialize(aggregate) ⇒ CommandExecutor
Initializes a new CommandExecutor
64 65 66 67 |
# File 'lib/yes/core/command_handling/command_executor.rb', line 64 def initialize(aggregate) @aggregate = aggregate @read_model = aggregate.read_model if aggregate.class.read_model_enabled? end |
Instance Method Details
#call(cmd, command_name, guard_evaluator_class, skip_guards: false) ⇒ Yes::Core::Commands::Response
Executes a command with retry logic and error handling
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/yes/core/command_handling/command_executor.rb', line 76 def call(cmd, command_name, guard_evaluator_class, skip_guards: false) retries = 0 begin evaluator = GuardRunner.new(aggregate).call(cmd, command_name, guard_evaluator_class, skip_guards:) set_pending_update_state if aggregate.class.read_model_enabled? begin event = EventPublisher.new( command: cmd, aggregate_data: EventPublisher::AggregateEventPublicationData.from_aggregate(aggregate), accessed_external_aggregates: evaluator&.accessed_external_aggregates || [] ).call rescue StandardError => e clear_pending_update_state if aggregate.class.read_model_enabled? raise e end command_response_class(cmd).new(cmd:, event:) rescue PgEventstore::WrongExpectedRevisionError => e retries += 1 clear_pending_update_state if aggregate.class.read_model_enabled? retries <= MAX_RETRIES ? retry : raise(e) rescue ConcurrentUpdateError => e retries += 1 # Don't clear pending state - another process owns it # Sleep with exponential backoff to give the other process time to finish sleep([0.01 * (2**(retries - 1)), 1.0].min) if retries <= MAX_RETRIES # After several retries, check if pending state is stuck and attempt recovery # This prevents infinite retry loops when a process crashes leaving the flag set if aggregate.class.read_model_enabled? && retries >= INLINE_RECOVERY_RETRY_THRESHOLD ReadModelRecoveryService.attempt_inline_recovery(read_model, aggregate: aggregate) read_model.reload end retries <= MAX_RETRIES ? retry : raise(e) rescue GuardEvaluator::InvalidTransition, GuardEvaluator::NoChangeTransition, Yes::Core::Command::Invalid => e command_response_class(cmd).new(cmd: cmd, error: e, batch_id: cmd.batch_id) end end |