Class: Yes::Core::CommandHandling::ReadModelRecoveryService

Inherits:
Object
  • Object
show all
Extended by:
OpenTelemetry::Trackable
Defined in:
lib/yes/core/command_handling/read_model_recovery_service.rb

Overview

Service for recovering read models that are stuck in pending state This handles cases where the process was interrupted after publishing an event but before updating the read model

Defined Under Namespace

Classes: RecoveryResult

Constant Summary collapse

DEFAULT_STUCK_TIMEOUT =

Default timeout after which a read model is considered stuck

1.minute

Class Method Summary collapse

Class Method Details

.attempt_inline_recovery(read_model, aggregate:, threshold: 2.seconds) ⇒ Boolean

Attempts inline recovery during command execution retry loops This method is designed to be called from CommandExecutor when stuck in retry loops

Parameters:

  • read_model (ActiveRecord::Base)

    The read model to check and recover

  • aggregate (Yes::Core::Aggregate)

    Aggregate instance to use for recovery

  • threshold (ActiveSupport::Duration) (defaults to: 2.seconds)

    Time threshold for considering state stuck (default: 2 seconds)

Returns:

  • (Boolean)

    True if recovery succeeded or not needed, false if recovery failed



136
137
138
139
140
# File 'lib/yes/core/command_handling/read_model_recovery_service.rb', line 136

def attempt_inline_recovery(read_model, aggregate:, threshold: 2.seconds)
  return true unless read_model.respond_to?(:pending_update_since)

  attempt_recovery_if_pending(read_model, threshold, aggregate: aggregate)
end

.check_and_recover_with_retries(read_model, aggregate:, threshold: 2.seconds, max_retries: 10) ⇒ void

This method returns an undefined value.

Checks if a read model needs recovery and attempts it with retries

Parameters:

  • read_model (ActiveRecord::Base)

    The read model to check

  • aggregate (Yes::Core::Aggregate)

    Aggregate instance to use for recovery

  • threshold (ActiveSupport::Duration) (defaults to: 2.seconds)

    Time threshold for recovery (default: 2 seconds)

  • max_retries (Integer) (defaults to: 10)

    Maximum number of retry attempts (default: 10)

Raises:



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/yes/core/command_handling/read_model_recovery_service.rb', line 104

def check_and_recover_with_retries(read_model, aggregate:, threshold: 2.seconds, max_retries: 10)
  return unless read_model.respond_to?(:pending_update_since)

  retrier = Yes::Core::Utils::ExponentialRetrier.new(
    max_retries: max_retries,
    base_sleep_time: 0.1,
    max_sleep_time: 1.0,
    timeout: 30,
    logger: Rails.logger
  )

  retrier.call(
    condition_check: -> { attempt_recovery_if_pending(read_model, threshold, aggregate:) },
    failure_message: "Could not recover pending read model #{read_model.class.name}##{read_model.id}",
    timeout_message: "Timeout waiting for read model recovery #{read_model.class.name}##{read_model.id}"
  ) do
    # Success - either not pending or recovered
    true
  end
end

.recover_all_stuck_read_models(stuck_timeout: DEFAULT_STUCK_TIMEOUT, batch_size: 100) ⇒ Array<RecoveryResult>

Finds and recovers all stuck read models

Parameters:

  • stuck_timeout (ActiveSupport::Duration) (defaults to: DEFAULT_STUCK_TIMEOUT)

    Time after which a model is considered stuck

  • batch_size (Integer) (defaults to: 100)

    Number of read models to process at once

Returns:



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/yes/core/command_handling/read_model_recovery_service.rb', line 23

def recover_all_stuck_read_models(stuck_timeout: DEFAULT_STUCK_TIMEOUT, batch_size: 100)
  results = []

  find_stuck_read_models_with_aggregates(stuck_timeout:, batch_size:).each do |entry|
    result = recover_read_model(
      entry[:read_model],
      aggregate_class: entry[:aggregate_class],
      is_draft: entry[:is_draft]
    )
    results << result

    log_recovery_result(result)
  end

  results
end

.recover_read_model(read_model, aggregate_class:, is_draft: false) ⇒ RecoveryResult

Recovers a single read model

Parameters:

  • read_model (ActiveRecord::Base)

    The read model to recover

  • aggregate_class (Class)

    The aggregate class to use for recovery

  • is_draft (Boolean) (defaults to: false)

    Flag indicating if this is a draft aggregate

Returns:



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/yes/core/command_handling/read_model_recovery_service.rb', line 45

def recover_read_model(read_model, aggregate_class:, is_draft: false)
  # Use advisory lock to prevent concurrent recovery attempts
  lock_key = "read_model_recovery_#{read_model.class.name}_#{read_model.id}"

  with_advisory_lock(lock_key) do
    # Re-check if still stuck after acquiring lock
    read_model.reload

    return RecoveryResult.new(success: true, read_model:, error_message: 'Already recovered') unless read_model.pending_update_since?

    # Instantiate aggregate with proper parameters
    aggregate_id = determine_aggregate_id(read_model)

    aggregate = if is_draft
                  aggregate_class.new(aggregate_id, draft: true)
                else
                  aggregate_class.new(aggregate_id)
                end

    latest_event = aggregate.latest_event

    # Reapply the update using ReadModelUpdater
    updater = ReadModelUpdater.new(aggregate)
    updater.call(latest_event, latest_event.data)

    RecoveryResult.new(success: true, read_model:)
  end
rescue ActiveRecord::ActiveRecordError => e
  # Database errors during recovery
  RecoveryResult.new(
    success: false,
    read_model:,
    error_message: "Database error during recovery: #{e.message}"
  )
rescue PgEventstore::Error => e
  # Event store errors during recovery
  RecoveryResult.new(
    success: false,
    read_model:,
    error_message: "Event store error during recovery: #{e.message}"
  )
rescue StandardError => e
  # Unexpected errors - log them for debugging
  Rails.logger.error("Unexpected error recovering read model #{read_model.class.name}##{read_model.id}: #{e.class.name}")
  RecoveryResult.new(
    success: false,
    read_model:,
    error_message: "Unexpected error: #{e.class.name} - #{e.message}"
  )
end