Class: Yes::Core::CommandHandling::CommandGroupExecutor
- Inherits:
-
Object
- Object
- Yes::Core::CommandHandling::CommandGroupExecutor
- Defined in:
- lib/yes/core/command_handling/command_group_executor.rb
Overview
Low-level executor for command groups.
Mirrors CommandExecutor, with two key differences:
-
Sub-command events publish inside a single ‘PgEventstore.client.multiple` block, so either all events commit atomically or none do.
-
Read-model updates run AFTER the eventstore commit succeeds, in declaration order, so a rolled-back transaction never leaves the read model ahead of the stream.
Only the group’s own guards run here — sub-command guards are bypassed by design.
Constant Summary collapse
- MAX_RETRIES =
10- INLINE_RECOVERY_RETRY_THRESHOLD =
5
Instance Method Summary collapse
- #call(cmd, group_name, guard_evaluator_class, skip_guards: false) ⇒ Yes::Core::Commands::CommandGroupResponse
-
#initialize(aggregate) ⇒ CommandGroupExecutor
constructor
A new instance of CommandGroupExecutor.
Constructor Details
#initialize(aggregate) ⇒ CommandGroupExecutor
Returns a new instance of CommandGroupExecutor.
24 25 26 27 |
# File 'lib/yes/core/command_handling/command_group_executor.rb', line 24 def initialize(aggregate) @aggregate = aggregate @read_model = aggregate.read_model if aggregate.class.read_model_enabled? end |
Instance Method Details
#call(cmd, group_name, guard_evaluator_class, skip_guards: false) ⇒ Yes::Core::Commands::CommandGroupResponse
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/yes/core/command_handling/command_group_executor.rb', line 34 def call(cmd, group_name, guard_evaluator_class, skip_guards: false) retries = 0 begin evaluator = GuardRunner.new(aggregate).call(cmd, group_name, guard_evaluator_class, skip_guards:) external_aggregates = evaluator&.accessed_external_aggregates || [] set_pending_update_state if aggregate.class.read_model_enabled? events = publish_events(cmd, external_aggregates) apply_read_model_updates(cmd, events) if aggregate.class.read_model_enabled? Yes::Core::Commands::CommandGroupResponse.new(cmd:, events:) rescue PgEventstore::WrongExpectedRevisionError => e retries += 1 clear_pending_update_state if aggregate.class.read_model_enabled? retries <= MAX_RETRIES ? retry : raise(e) rescue ConcurrentUpdateError => e retries += 1 sleep([0.01 * (2**(retries - 1)), 1.0].min) if retries <= MAX_RETRIES if aggregate.class.read_model_enabled? && retries >= INLINE_RECOVERY_RETRY_THRESHOLD ReadModelRecoveryService.attempt_inline_recovery(read_model, aggregate: aggregate) read_model.reload end retries <= MAX_RETRIES ? retry : raise(e) rescue GuardEvaluator::InvalidTransition, GuardEvaluator::NoChangeTransition, Yes::Core::Command::Invalid => e clear_pending_update_state if aggregate.class.read_model_enabled? Yes::Core::Commands::CommandGroupResponse.new(cmd:, error: e) end end |