Class: RubyReactor::Executor

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_reactor/executor.rb,
lib/ruby_reactor/executor/graph_manager.rb,
lib/ruby_reactor/executor/retry_manager.rb,
lib/ruby_reactor/executor/step_executor.rb,
lib/ruby_reactor/executor/result_handler.rb,
lib/ruby_reactor/executor/input_validator.rb,
lib/ruby_reactor/executor/compensation_manager.rb

Defined Under Namespace

Classes: CompensationManager, GraphManager, InputValidator, ResultHandler, RetryManager, StepExecutor

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(reactor_class, inputs = {}, context = nil) ⇒ Executor

Returns a new instance of Executor.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/ruby_reactor/executor.rb', line 15

def initialize(reactor_class, inputs = {}, context = nil)
  @reactor_class = reactor_class
  @context = context || Context.new(inputs, reactor_class)
  @dependency_graph = DependencyGraph.new
  @compensation_manager = CompensationManager.new(@context)
  @retry_manager = RetryManager.new(@context)
  @result_handler = ResultHandler.new(
    context: @context,
    compensation_manager: @compensation_manager,
    dependency_graph: @dependency_graph
  )
  @step_executor = StepExecutor.new(
    context: @context,
    dependency_graph: @dependency_graph,
    reactor_class: @reactor_class,
    managers: {
      retry_manager: @retry_manager,
      result_handler: @result_handler,
      compensation_manager: @compensation_manager
    }
  )
  @result = nil
  @acquired_lock = nil
  @acquired_semaphore = nil
end

Instance Attribute Details

#compensation_managerObject (readonly)

Returns the value of attribute compensation_manager.



12
13
14
# File 'lib/ruby_reactor/executor.rb', line 12

def compensation_manager
  @compensation_manager
end

#contextObject (readonly)

Returns the value of attribute context.



12
13
14
# File 'lib/ruby_reactor/executor.rb', line 12

def context
  @context
end

#dependency_graphObject (readonly)

Returns the value of attribute dependency_graph.



12
13
14
# File 'lib/ruby_reactor/executor.rb', line 12

def dependency_graph
  @dependency_graph
end

#reactor_classObject (readonly)

Returns the value of attribute reactor_class.



12
13
14
# File 'lib/ruby_reactor/executor.rb', line 12

def reactor_class
  @reactor_class
end

#resultObject (readonly)

Returns the value of attribute result.



12
13
14
# File 'lib/ruby_reactor/executor.rb', line 12

def result
  @result
end

#result_handlerObject (readonly)

Returns the value of attribute result_handler.



12
13
14
# File 'lib/ruby_reactor/executor.rb', line 12

def result_handler
  @result_handler
end

#retry_managerObject (readonly)

Returns the value of attribute retry_manager.



12
13
14
# File 'lib/ruby_reactor/executor.rb', line 12

def retry_manager
  @retry_manager
end

#step_executorObject (readonly)

Returns the value of attribute step_executor.



12
13
14
# File 'lib/ruby_reactor/executor.rb', line 12

def step_executor
  @step_executor
end

Instance Method Details

#executeObject



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/ruby_reactor/executor.rb', line 41

def execute
  skipped = check_period_gate
  if skipped
    @result = skipped
    update_context_status(@result)
    save_context
    return @result
  end

  check_rate_limit
  acquire_locks

  input_validator = InputValidator.new(@reactor_class, @context)
  input_validator.validate!

  @context.status = :running
  save_context

  graph_manager = GraphManager.new(@reactor_class, @dependency_graph, @context)
  graph_manager.build_and_validate!
  graph_manager.mark_completed_steps_from_context

  @result = @step_executor.execute_all_steps
  update_context_status(@result)
  mark_period_on_success(@result)
  handle_interrupt(@result) if @result.is_a?(RubyReactor::InterruptResult)
  @result
rescue RubyReactor::Lock::AcquisitionError,
       RubyReactor::Semaphore::AcquisitionError,
       RubyReactor::RateLimit::ExceededError => e
  raise e
rescue StandardError => e
  @result = @result_handler.handle_execution_error(e)
  update_context_status(@result)
  @result
ensure
  release_locks
  save_context
end

#execution_traceObject



124
125
126
# File 'lib/ruby_reactor/executor.rb', line 124

def execution_trace
  @context.execution_trace
end

#resume_executionObject



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/ruby_reactor/executor.rb', line 81

def resume_execution
  @context.status = :running
  acquire_locks
  prepare_for_resume
  save_context

  @result = if @context.current_step
              execute_current_step_and_continue
            else
              execute_remaining_steps
            end

  update_context_status(@result)
  mark_period_on_success(@result)

  handle_interrupt(@result) if @result.is_a?(RubyReactor::InterruptResult)

  @result
rescue RubyReactor::Lock::AcquisitionError,
       RubyReactor::Semaphore::AcquisitionError,
       RubyReactor::RateLimit::ExceededError => e
  raise e
rescue StandardError => e
  handle_resume_error(e)
  update_context_status(@result)
  @result
ensure
  release_locks
  save_context
end

#save_contextObject



128
129
130
131
132
133
134
135
# File 'lib/ruby_reactor/executor.rb', line 128

def save_context
  storage = RubyReactor::Configuration.instance.storage_adapter
  reactor_class_name = @reactor_class.name || "AnonymousReactor-#{@reactor_class.object_id}"

  # Serialize context
  serialized_context = ContextSerializer.serialize(@context)
  storage.store_context(@context.context_id, serialized_context, reactor_class_name)
end

#undo_allObject



112
113
114
# File 'lib/ruby_reactor/executor.rb', line 112

def undo_all
  @compensation_manager.rollback_completed_steps
end

#undo_stackObject



116
117
118
# File 'lib/ruby_reactor/executor.rb', line 116

def undo_stack
  @compensation_manager.undo_stack
end

#undo_traceObject



120
121
122
# File 'lib/ruby_reactor/executor.rb', line 120

def undo_trace
  @compensation_manager.undo_trace
end