Class: Yes::Core::CommandHandling::ReadModelRecoveryService
- Inherits:
-
Object
- Object
- Yes::Core::CommandHandling::ReadModelRecoveryService
- Extended by:
- OpenTelemetry::Trackable
- Defined in:
- lib/yes/core/command_handling/read_model_recovery_service.rb
Overview
Service for recovering read models that are stuck in pending state This handles cases where the process was interrupted after publishing an event but before updating the read model
Defined Under Namespace
Classes: RecoveryResult
Constant Summary collapse
- DEFAULT_STUCK_TIMEOUT =
Default timeout after which a read model is considered stuck
1.minute
Class Method Summary collapse
-
.attempt_inline_recovery(read_model, aggregate:, threshold: 2.seconds) ⇒ Boolean
Attempts inline recovery during command execution retry loops This method is designed to be called from CommandExecutor when stuck in retry loops.
-
.check_and_recover_with_retries(read_model, aggregate:, threshold: 2.seconds, max_retries: 10) ⇒ void
Checks if a read model needs recovery and attempts it with retries.
-
.recover_all_stuck_read_models(stuck_timeout: DEFAULT_STUCK_TIMEOUT, batch_size: 100) ⇒ Array<RecoveryResult>
Finds and recovers all stuck read models.
-
.recover_read_model(read_model, aggregate_class:, is_draft: false) ⇒ RecoveryResult
Recovers a single read model.
Class Method Details
.attempt_inline_recovery(read_model, aggregate:, threshold: 2.seconds) ⇒ Boolean
Attempts inline recovery during command execution retry loops This method is designed to be called from CommandExecutor when stuck in retry loops
136 137 138 139 140 |
# File 'lib/yes/core/command_handling/read_model_recovery_service.rb', line 136 def attempt_inline_recovery(read_model, aggregate:, threshold: 2.seconds) return true unless read_model.respond_to?(:pending_update_since) attempt_recovery_if_pending(read_model, threshold, aggregate: aggregate) end |
.check_and_recover_with_retries(read_model, aggregate:, threshold: 2.seconds, max_retries: 10) ⇒ void
This method returns an undefined value.
Checks if a read model needs recovery and attempts it with retries
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/yes/core/command_handling/read_model_recovery_service.rb', line 104 def check_and_recover_with_retries(read_model, aggregate:, threshold: 2.seconds, max_retries: 10) return unless read_model.respond_to?(:pending_update_since) retrier = Yes::Core::Utils::ExponentialRetrier.new( max_retries: max_retries, base_sleep_time: 0.1, max_sleep_time: 1.0, timeout: 30, logger: Rails.logger ) retrier.call( condition_check: -> { attempt_recovery_if_pending(read_model, threshold, aggregate:) }, failure_message: "Could not recover pending read model #{read_model.class.name}##{read_model.id}", timeout_message: "Timeout waiting for read model recovery #{read_model.class.name}##{read_model.id}" ) do # Success - either not pending or recovered true end end |
.recover_all_stuck_read_models(stuck_timeout: DEFAULT_STUCK_TIMEOUT, batch_size: 100) ⇒ Array<RecoveryResult>
Finds and recovers all stuck read models
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/yes/core/command_handling/read_model_recovery_service.rb', line 23 def recover_all_stuck_read_models(stuck_timeout: DEFAULT_STUCK_TIMEOUT, batch_size: 100) results = [] find_stuck_read_models_with_aggregates(stuck_timeout:, batch_size:).each do |entry| result = recover_read_model( entry[:read_model], aggregate_class: entry[:aggregate_class], is_draft: entry[:is_draft] ) results << result log_recovery_result(result) end results end |
.recover_read_model(read_model, aggregate_class:, is_draft: false) ⇒ RecoveryResult
Recovers a single read model
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/yes/core/command_handling/read_model_recovery_service.rb', line 45 def recover_read_model(read_model, aggregate_class:, is_draft: false) # Use advisory lock to prevent concurrent recovery attempts lock_key = "read_model_recovery_#{read_model.class.name}_#{read_model.id}" with_advisory_lock(lock_key) do # Re-check if still stuck after acquiring lock read_model.reload return RecoveryResult.new(success: true, read_model:, error_message: 'Already recovered') unless read_model.pending_update_since? # Instantiate aggregate with proper parameters aggregate_id = determine_aggregate_id(read_model) aggregate = if is_draft aggregate_class.new(aggregate_id, draft: true) else aggregate_class.new(aggregate_id) end latest_event = aggregate.latest_event # Reapply the update using ReadModelUpdater updater = ReadModelUpdater.new(aggregate) updater.call(latest_event, latest_event.data) RecoveryResult.new(success: true, read_model:) end rescue ActiveRecord::ActiveRecordError => e # Database errors during recovery RecoveryResult.new( success: false, read_model:, error_message: "Database error during recovery: #{e.}" ) rescue PgEventstore::Error => e # Event store errors during recovery RecoveryResult.new( success: false, read_model:, error_message: "Event store error during recovery: #{e.}" ) rescue StandardError => e # Unexpected errors - log them for debugging Rails.logger.error("Unexpected error recovering read model #{read_model.class.name}##{read_model.id}: #{e.class.name}") RecoveryResult.new( success: false, read_model:, error_message: "Unexpected error: #{e.class.name} - #{e.}" ) end |