Class: RubyReactor::Reactor

Inherits:
Object
  • Object
show all
Includes:
Dsl::Lockable, Dsl::Reactor
Defined in:
lib/ruby_reactor/reactor.rb

Overview

rubocop:disable Metrics/ClassLength

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Dsl::Lockable

included

Methods included from Dsl::Reactor

included

Constructor Details

#initialize(context = {}) ⇒ Reactor

Returns a new instance of Reactor.



66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/ruby_reactor/reactor.rb', line 66

def initialize(context = {})
  @context = context
  @result = :unexecuted

  if @context.is_a?(Context)
    @execution_trace = @context.execution_trace || []
    @undo_trace = @execution_trace.select { |e| e[:type] == :undo }
    @result = reconstruct_result
  else
    @undo_trace = []
    @execution_trace = []
  end
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



9
10
11
# File 'lib/ruby_reactor/reactor.rb', line 9

def context
  @context
end

#execution_traceObject (readonly)

Returns the value of attribute execution_trace.



9
10
11
# File 'lib/ruby_reactor/reactor.rb', line 9

def execution_trace
  @execution_trace
end

#resultObject (readonly)

Returns the value of attribute result.



9
10
11
# File 'lib/ruby_reactor/reactor.rb', line 9

def result
  @result
end

#undo_traceObject (readonly)

Returns the value of attribute undo_trace.



9
10
11
# File 'lib/ruby_reactor/reactor.rb', line 9

def undo_trace
  @undo_trace
end

Class Method Details

.cancel(id:, reason:) ⇒ Object



51
52
53
54
# File 'lib/ruby_reactor/reactor.rb', line 51

def self.cancel(id:, reason:)
  reactor = find(id)
  reactor.cancel(reason)
end

.configurationObject



62
63
64
# File 'lib/ruby_reactor/reactor.rb', line 62

def self.configuration
  RubyReactor::Configuration.instance
end

.continue(id:, payload:, step_name:, idempotency_key: nil) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/ruby_reactor/reactor.rb', line 31

def self.continue(id:, payload:, step_name:, idempotency_key: nil)
  reactor = find(id)
  result = reactor.continue(payload: payload, step_name: step_name, idempotency_key: idempotency_key)

  if result.is_a?(RubyReactor::Failure) && result.respond_to?(:invalid_payload?) && result.invalid_payload?
    # Raise exception to match expected behavior (strict mode for class method)
    # We do NOT cancel the reactor, allowing the user to retry with valid payload
    raise Error::InputValidationError, result.error
  end

  result
end

.continue_by_correlation_id(correlation_id:, payload:, step_name:, idempotency_key: nil) ⇒ Object



44
45
46
47
48
49
# File 'lib/ruby_reactor/reactor.rb', line 44

def self.continue_by_correlation_id(correlation_id:, payload:, step_name:, idempotency_key: nil)
  reactor = find_by_correlation_id(correlation_id)
  # We delegate to the class-level continue method to ensure auto-compensation logic applies
  # by using the context ID found by find_by_correlation_id
  continue(id: reactor.context.context_id, payload: payload, step_name: step_name, idempotency_key: idempotency_key)
end

.find(id) ⇒ Object



11
12
13
14
15
16
17
18
# File 'lib/ruby_reactor/reactor.rb', line 11

def self.find(id)
  reactor_class_name = name
  serialized_context = configuration.storage_adapter.retrieve_context(id, reactor_class_name)
  raise Error::ValidationError, "Context '#{id}' not found" unless serialized_context

  context = Context.deserialize_from_retry(serialized_context)
  new(context)
end

.find_by_correlation_id(correlation_id) ⇒ Object



20
21
22
23
24
25
26
27
28
29
# File 'lib/ruby_reactor/reactor.rb', line 20

def self.find_by_correlation_id(correlation_id)
  reactor_class_name = name
  context_id = configuration.storage_adapter.retrieve_context_id_by_correlation_id(
    correlation_id,
    reactor_class_name
  )
  raise Error::ValidationError, "Correlation ID '#{correlation_id}' not found" unless context_id

  find(context_id)
end

.undo(id) ⇒ Object



56
57
58
59
60
# File 'lib/ruby_reactor/reactor.rb', line 56

def self.undo(id)
  reactor = find(id)
  reactor.undo
  cancel(id: id, reason: "Undo triggered")
end

Instance Method Details

#cancel(reason) ⇒ Object



177
178
179
180
181
182
# File 'lib/ruby_reactor/reactor.rb', line 177

def cancel(reason)
  @context.cancelled = true
  @context.cancellation_reason = reason
  @context.status = "cancelled"
  save_context
end

#continue(payload:, step_name:, idempotency_key: nil) ⇒ Object

rubocop:enable Metrics/MethodLength, Metrics/AbcSize



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/ruby_reactor/reactor.rb', line 135

def continue(payload:, step_name:, idempotency_key: nil)
  _ = idempotency_key

  unless @context.current_step
    raise Error::ValidationError, "Cannot resume: context does not have a current step (was it interrupted?)"
  end

  if @context.cancelled
    raise Error::ValidationError,
          "Cannot resume: reactor has been cancelled (Reason: #{@context.cancellation_reason})"
  end

  validate_continue_step!(step_name)

  if (failure = validate_continue_payload(payload, step_name))
    return failure
  end

  target_step = step_name
  @context.set_result(target_step, payload)

  # Resume execution
  executor = Executor.new(self.class, {}, @context)
  @result = executor.resume_execution

  @context = executor.context
  @undo_trace = executor.undo_trace
  @execution_trace = executor.execution_trace

  @result
rescue Error::InputValidationError => e
  # This might catch other validations, but here we specifically want payload validation.
  # The block above handles payload validation explicitly.
  RubyReactor::Failure(e.message, invalid_payload: true)
end

#run(inputs = {}) ⇒ Object

rubocop:disable Metrics/MethodLength



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/ruby_reactor/reactor.rb', line 81

def run(inputs = {})
  # For all reactors, initialize context first to capture execution ID
  @context = @context.is_a?(Context) ? @context : Context.new(inputs, self.class)

  # Validate inputs
  validation_result = self.class.validate_inputs(inputs)
  if validation_result.failure?
    @result = validation_result
    @context.status = "failed"
    @context.failure_reason = {
      message: validation_result.error.message,
      validation_errors: validation_result.error.field_errors
    }
    save_context
    return validation_result
  end

  if self.class.async? && !@context.inline_async_execution
    # For async reactors, queue a job for the whole reactor
    @context.status = :running
    save_context

    serialized_context = ContextSerializer.serialize(@context)
    @result = configuration.async_router.perform_async(serialized_context, self.class.name,
                                                       intermediate_results: @context.intermediate_results)

    # Even if it's an AsyncResult, it might have finished inline (e.g. Sidekiq::Testing.inline!)
    # Check storage to see if it's already finished or paused (interrupted).
    begin
      reloaded = self.class.find(@context.context_id)
      if reloaded.finished? || reloaded.context.status.to_s == "paused"
        @context = reloaded.context
        @result = reloaded.result
        @execution_trace = reloaded.execution_trace
        @undo_trace = reloaded.undo_trace
        return @result
      end
    rescue StandardError
      # Ignore if not found or other errors during reload check
    end

  else
    # For sync reactors (potentially with async steps), execute normally
    context = @context.is_a?(Context) ? @context : nil
    executor = Executor.new(self.class, inputs, context)
    @result = executor.execute
    @context = executor.context
    @execution_trace = executor.execution_trace
    @undo_trace = executor.undo_trace
  end
  @result
end

#undoObject



171
172
173
174
175
# File 'lib/ruby_reactor/reactor.rb', line 171

def undo
  executor = Executor.new(self.class, {}, @context)
  executor.undo_all
  executor.save_context
end

#validate!Object



184
185
186
187
188
189
# File 'lib/ruby_reactor/reactor.rb', line 184

def validate!
  # Validate reactor configuration
  validate_steps!
  validate_return_step!
  validate_dependencies!
end