Class: Dynflow::ExecutionPlan

Inherits:
Serializable show all
Includes:
Algebrick::TypeCheck, Stateful
Defined in:
lib/dynflow/execution_plan.rb,
lib/dynflow/execution_plan/hooks.rb

Overview

rubocop:disable Metrics/ClassLength TODO extract planning logic to an extra class ExecutionPlanner

Defined Under Namespace

Modules: Hooks, Steps Classes: DependencyGraph, InvalidPlan, OutputReference

Constant Summary

Constants inherited from Serializable

Serializable::LEGACY_TIME_FORMAT, Serializable::TIME_FORMAT

Instance Attribute Summary collapse

Attributes included from Stateful

#state

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Stateful

included, #set_state, #state_transitions, #states

Methods inherited from Serializable

constantize, from_hash

Constructor Details

#initialize(world, id = nil, label = nil, state = :pending, root_plan_step = nil, run_flow = Flows::Concurrence.new([]), finalize_flow = Flows::Sequence.new([]), steps = {}, started_at = nil, ended_at = nil, execution_time = nil, real_time = 0.0, execution_history = ExecutionHistory.new) ⇒ ExecutionPlan

all params with default values are part of private api



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/dynflow/execution_plan.rb', line 72

def initialize(world,
               id                = nil,
               label             = nil,
               state             = :pending,
               root_plan_step    = nil,
               run_flow          = Flows::Concurrence.new([]),
               finalize_flow     = Flows::Sequence.new([]),
               steps             = {},
               started_at        = nil,
               ended_at          = nil,
               execution_time    = nil,
               real_time         = 0.0,
               execution_history = ExecutionHistory.new)
  id ||= SecureRandom.uuid
  @id = Type! id, String
  @world = Type! world, World
  @label = Type! label, String, NilClass
  self.state         = state
  @run_flow          = Type! run_flow, Flows::Abstract
  @finalize_flow     = Type! finalize_flow, Flows::Abstract
  @root_plan_step    = root_plan_step
  @started_at        = Type! started_at, Time, NilClass
  @ended_at          = Type! ended_at, Time, NilClass
  @execution_time    = Type! execution_time, Numeric, NilClass
  @real_time         = Type! real_time, Numeric
  @execution_history = Type! execution_history, ExecutionHistory

  steps.all? do |k, v|
    Type! k, Integer
    Type! v, Steps::Abstract
  end
  @steps = steps
end

Instance Attribute Details

#ended_atObject (readonly)

Returns the value of attribute ended_at.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def ended_at
  @ended_at
end

#execution_historyObject (readonly)

Returns the value of attribute execution_history.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def execution_history
  @execution_history
end

#execution_timeObject (readonly)

Returns the value of attribute execution_time.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def execution_time
  @execution_time
end

#finalize_flowObject (readonly)

Returns the value of attribute finalize_flow.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def finalize_flow
  @finalize_flow
end

#idObject (readonly)

Returns the value of attribute id.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def id
  @id
end

#labelObject (readonly)

Returns the value of attribute label.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def label
  @label
end

#real_timeObject (readonly)

Returns the value of attribute real_time.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def real_time
  @real_time
end

#root_plan_stepObject (readonly)

Returns the value of attribute root_plan_step.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def root_plan_step
  @root_plan_step
end

#run_flowObject (readonly)

Returns the value of attribute run_flow.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def run_flow
  @run_flow
end

#started_atObject (readonly)

Returns the value of attribute started_at.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def started_at
  @started_at
end

#stepsObject (readonly)

Returns the value of attribute steps.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def steps
  @steps
end

#worldObject (readonly)

Returns the value of attribute world.



47
48
49
# File 'lib/dynflow/execution_plan.rb', line 47

def world
  @world
end

Class Method Details

.load_flow(flow_hash) ⇒ Object



426
427
428
429
430
431
432
# File 'lib/dynflow/execution_plan.rb', line 426

def self.load_flow(flow_hash)
  if flow_hash.is_a? Hash
    Flows::Abstract.from_hash(flow_hash)
  else
    Flows::Abstract.decode(flow_hash)
  end
end

.new_from_hash(hash, world) ⇒ Object



455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
# File 'lib/dynflow/execution_plan.rb', line 455

def self.new_from_hash(hash, world)
  check_class_matching hash
  execution_plan_id = hash[:id]
  steps             = steps_from_hash(hash[:step_ids], execution_plan_id, world)
  self.new(world,
    execution_plan_id,
    hash[:label],
    hash[:state],
    steps[hash[:root_plan_step_id]],
    load_flow(hash[:run_flow]),
    load_flow(hash[:finalize_flow]),
    steps,
    string_to_time(hash[:started_at]),
    string_to_time(hash[:ended_at]),
    hash[:execution_time].to_f,
    hash[:real_time].to_f,
    ExecutionHistory.new_from_hash(hash[:execution_history]))
rescue => plan_exception
  begin
    world.logger.error("Could not load execution plan #{execution_plan_id}")
    world.logger.error(plan_exception)
    InvalidPlan.new(plan_exception, execution_plan_id,
      hash[:label],
      hash[:state],
      string_to_time(hash[:started_at]),
      string_to_time(hash[:ended_at]),
      hash[:execution_time].to_f,
      hash[:real_time].to_f,
      ExecutionHistory.new_from_hash(hash[:execution_history]))
  rescue => invalid_plan_exception
    world.logger.error("Could not even load a fallback execution plan for #{execution_plan_id}")
    world.logger.error(invalid_plan_exception)
    InvalidPlan.new(invalid_plan_exception, execution_plan_id,
      hash[:label],
      hash[:state])
  end
end

.resultsObject



57
58
59
# File 'lib/dynflow/execution_plan.rb', line 57

def self.results
  @results ||= [:pending, :success, :warning, :error, :cancelled]
end

.state_transitionsObject



61
62
63
64
65
66
67
68
69
# File 'lib/dynflow/execution_plan.rb', line 61

def self.state_transitions
  @state_transitions ||= { pending:  [:stopped, :scheduled, :planning],
                           scheduled: [:planning, :stopped],
                           planning: [:planned, :stopped],
                           planned:  [:running, :stopped],
                           running:  [:paused, :stopped],
                           paused:   [:running, :stopped],
                           stopped:  [] }
end

.statesObject



51
52
53
# File 'lib/dynflow/execution_plan.rb', line 51

def self.states
  @states ||= [:pending, :scheduled, :planning, :planned, :running, :paused, :stopped]
end

Instance Method Details

#actionsArray<Action>

Returns actions in Present phase.

Returns:

  • (Array<Action>)

    actions in Present phase



517
518
519
520
521
# File 'lib/dynflow/execution_plan.rb', line 517

def actions
  @actions ||= begin
    [entry_action] + entry_action.all_planned_actions
  end
end

#add_finalize_step(action) ⇒ Object



419
420
421
422
423
424
# File 'lib/dynflow/execution_plan.rb', line 419

def add_finalize_step(action)
  add_step(Steps::FinalizeStep, action.class, action.id).tap do |step|
    step.update_from_action(action)
    finalize_flow << Flows::Atom.new(step.id)
  end
end

#add_plan_step(action_class, caller_action = nil) ⇒ Object



401
402
403
404
405
406
407
408
409
# File 'lib/dynflow/execution_plan.rb', line 401

def add_plan_step(action_class, caller_action = nil)
  add_step(Steps::PlanStep, action_class, generate_action_id).tap do |step|
    # TODO: to be removed and preferred by the caller_action
    if caller_action && caller_action.execution_plan_id == self.id
      @steps[caller_action.plan_step_id].children << step.id
    end
    step.initialize_action(caller_action)
  end
end

#add_run_step(action) ⇒ Object



411
412
413
414
415
416
417
# File 'lib/dynflow/execution_plan.rb', line 411

def add_run_step(action)
  add_step(Steps::RunStep, action.class, action.id).tap do |step|
    step.update_from_action(action)
    @dependency_graph.add_dependencies(step, action)
    current_run_flow.add_and_resolve(@dependency_graph, Flows::Atom.new(step.id))
  end
end

#add_scheduling_step(action_class, caller_action = nil) ⇒ Object



395
396
397
398
399
# File 'lib/dynflow/execution_plan.rb', line 395

def add_scheduling_step(action_class, caller_action = nil)
  add_step(Steps::PlanStep, action_class, generate_action_id, :scheduling).tap do |step|
    step.initialize_action(caller_action)
  end
end

#caller_execution_plan_idObject



523
524
525
# File 'lib/dynflow/execution_plan.rb', line 523

def caller_execution_plan_id
  entry_action.caller_execution_plan_id
end

#cancel(force = false) ⇒ Object

sends the cancel event to all currently running and cancellable steps. if the plan is just scheduled, it cancels it (and returns an one-item array with the future value of the cancel result)



311
312
313
314
315
316
317
318
319
320
# File 'lib/dynflow/execution_plan.rb', line 311

def cancel(force = false)
  if state == :scheduled
    [Concurrent::Promises.resolvable_future.tap { |f| f.fulfill delay_record.cancel }]
  else
    event = force ? ::Dynflow::Action::Cancellable::Abort : ::Dynflow::Action::Cancellable::Cancel
    steps_to_cancel.map do |step|
      world.event(id, step.id, event)
    end
  end
end

#cancellable?Boolean

Returns:

  • (Boolean)


322
323
324
325
326
# File 'lib/dynflow/execution_plan.rb', line 322

def cancellable?
  return true if state == :scheduled
  return false unless state == :running
  steps_to_cancel.any?
end

#compute_execution_timeObject



493
494
495
496
497
# File 'lib/dynflow/execution_plan.rb', line 493

def compute_execution_time
  self.steps.values.reduce(0) do |execution_time, step|
    execution_time + (step.execution_time || 0)
  end
end

#current_run_flowObject



371
372
373
# File 'lib/dynflow/execution_plan.rb', line 371

def current_run_flow
  @run_flow_stack.last
end

#delay(caller_action, action_class, delay_options, *args) ⇒ Object



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/dynflow/execution_plan.rb', line 256

def delay(caller_action, action_class, delay_options, *args)
  save
  @root_plan_step = add_scheduling_step(action_class, caller_action)
  run_hooks(:pending)
  serializer = root_plan_step.delay(delay_options, args)
  delayed_plan = DelayedPlan.new(@world,
    id,
    delay_options[:start_at],
    delay_options.fetch(:start_before, nil),
    serializer,
    delay_options[:frozen] || false)
  persistence.save_delayed_plan(delayed_plan)
ensure
  update_state(error? ? :stopped : :scheduled)
end

#delay_recordObject



272
273
274
# File 'lib/dynflow/execution_plan.rb', line 272

def delay_record
  @delay_record ||= persistence.load_delayed_plan(id)
end

#entry_actionObject



512
513
514
# File 'lib/dynflow/execution_plan.rb', line 512

def entry_action
  @entry_action ||= root_plan_step.action(self)
end

#error?Boolean

Returns:

  • (Boolean)


183
184
185
# File 'lib/dynflow/execution_plan.rb', line 183

def error?
  result == :error
end

#error_in_plan?Boolean

Returns:

  • (Boolean)


191
192
193
# File 'lib/dynflow/execution_plan.rb', line 191

def error_in_plan?
  steps_in_state(:error).any? { |step| step.is_a? Steps::PlanStep }
end

#errorsObject



195
196
197
# File 'lib/dynflow/execution_plan.rb', line 195

def errors
  steps.values.map(&:error).compact
end

#failed_stepsObject



238
239
240
# File 'lib/dynflow/execution_plan.rb', line 238

def failed_steps
  steps_in_state(:error)
end

#failure?Boolean

Returns:

  • (Boolean)


187
188
189
# File 'lib/dynflow/execution_plan.rb', line 187

def failure?
  [:error, :warning, :cancelled].include?(result)
end

#finalize_stepsObject



234
235
236
# File 'lib/dynflow/execution_plan.rb', line 234

def finalize_steps
  steps_of_type(Dynflow::ExecutionPlan::Steps::FinalizeStep)
end

#generate_action_idObject



246
247
248
249
# File 'lib/dynflow/execution_plan.rb', line 246

def generate_action_id
  @last_action_id ||= 0
  @last_action_id += 1
end

#generate_step_idObject



251
252
253
254
# File 'lib/dynflow/execution_plan.rb', line 251

def generate_step_id
  @last_step_id ||= 0
  @last_step_id += 1
end

#loggerObject



110
111
112
# File 'lib/dynflow/execution_plan.rb', line 110

def logger
  @world.logger
end

#plan(*args) ⇒ Object



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/dynflow/execution_plan.rb', line 287

def plan(*args)
  update_state(:planning)
  world.middleware.execute(:plan_phase, root_plan_step.action_class, self) do
    with_planning_scope do
      root_action = root_plan_step.execute(self, nil, false, *args)
      @label = root_action.label

      if @dependency_graph.unresolved?
        raise "Some dependencies were not resolved: #{@dependency_graph.inspect}"
      end
    end
  end

  if @run_flow.size == 1
    @run_flow = @run_flow.sub_flows.first
  end

  steps.values.each(&:save)
  update_state(error? ? :stopped : :planned)
end

#plan_stepsObject



226
227
228
# File 'lib/dynflow/execution_plan.rb', line 226

def plan_steps
  steps_of_type(Dynflow::ExecutionPlan::Steps::PlanStep)
end

#prepare(action_class, options = {}) ⇒ Object



276
277
278
279
280
281
282
283
284
285
# File 'lib/dynflow/execution_plan.rb', line 276

def prepare(action_class, options = {})
  options = options.dup
  caller_action = Type! options.delete(:caller_action), Dynflow::Action, NilClass
  raise "Unexpected options #{options.keys.inspect}" unless options.empty?
  save
  @root_plan_step = add_plan_step(action_class, caller_action)
  step = @root_plan_step.save
  run_hooks(:pending)
  step
end

#prepare_for_rescueObject



212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/dynflow/execution_plan.rb', line 212

def prepare_for_rescue
  case rescue_strategy
  when Action::Rescue::Pause
    :paused
  when Action::Rescue::Fail
    :stopped
  when Action::Rescue::Skip
    failed_steps.each { |step| self.skip(step) }
    :running
  else
    :paused
  end
end

#progress0..1

info

Returns:

  • (0..1)

    the percentage of the progress. See Action::Progress for more



501
502
503
504
505
506
507
508
509
510
# File 'lib/dynflow/execution_plan.rb', line 501

def progress
  return 0 if [:pending, :planning, :scheduled].include?(state)
  flow_step_ids         = run_flow.all_step_ids + finalize_flow.all_step_ids
  plan_done, plan_total = flow_step_ids.reduce([0.0, 0]) do |(done, total), step_id|
    step = self.steps[step_id]
    [done + (step.progress_done * step.progress_weight),
     total + step.progress_weight]
  end
  plan_total > 0 ? (plan_done / plan_total) : 1
end

#rescue_strategyObject



199
200
201
202
# File 'lib/dynflow/execution_plan.rb', line 199

def rescue_strategy
  rescue_strategy = entry_action.rescue_strategy || Action::Rescue::Skip
  Type! rescue_strategy, Action::Rescue::Strategy
end

#resultObject



168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/dynflow/execution_plan.rb', line 168

def result
  all_steps = steps.values
  if all_steps.any? { |step| step.state == :cancelled }
    return :cancelled
  elsif all_steps.any? { |step| step.state == :error }
    return :error
  elsif all_steps.any? { |step| [:skipping, :skipped].include?(step.state) }
    return :warning
  elsif all_steps.all? { |step| step.state == :success }
    return :success
  else
    return :pending
  end
end

#run_hooks(state) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/dynflow/execution_plan.rb', line 153

def run_hooks(state)
  records = persistence.load_actions_attributes(@id, [:id, :class]).select do |action|
    Utils.constantize(action[:class])
         .execution_plan_hooks
         .on(state).any?
  end
  action_ids = records.compact.map { |record| record[:id] }
  return if action_ids.empty?
  persistence.load_actions(self, action_ids).each do |action|
    world.middleware.execute(:hook, action, self) do
      action.class.execution_plan_hooks.run(self, action, state)
    end
  end
end

#run_stepsObject



230
231
232
# File 'lib/dynflow/execution_plan.rb', line 230

def run_steps
  steps_of_type(Dynflow::ExecutionPlan::Steps::RunStep)
end

#saveObject



451
452
453
# File 'lib/dynflow/execution_plan.rb', line 451

def save
  persistence.save_execution_plan(self)
end

#skip(step) ⇒ Object



334
335
336
337
338
# File 'lib/dynflow/execution_plan.rb', line 334

def skip(step)
  steps_to_skip = steps_to_skip(step).each(&:mark_to_skip)
  self.save
  return steps_to_skip
end

#steps_in_state(*states) ⇒ Object



242
243
244
# File 'lib/dynflow/execution_plan.rb', line 242

def steps_in_state(*states)
  self.steps.values.find_all { |step| states.include?(step.state) }
end

#steps_of_type(type) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



367
368
369
# File 'lib/dynflow/execution_plan.rb', line 367

def steps_of_type(type)
  steps.values.find_all { |step| step.is_a?(type) }
end

#steps_to_cancelObject



328
329
330
331
332
# File 'lib/dynflow/execution_plan.rb', line 328

def steps_to_cancel
  steps_in_state(:running, :suspended).find_all do |step|
    step.action(self).is_a?(::Dynflow::Action::Cancellable)
  end
end

#steps_to_skip(step) ⇒ Array<Steps::Abstract>

All the steps that need to get skipped when wanting to skip the step includes the step itself, all steps dependent on it (even transitively) FIND maybe move to persistence to let adapter to do it effectively?

Returns:



344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/dynflow/execution_plan.rb', line 344

def steps_to_skip(step)
  dependent_steps = steps.values.find_all do |s|
    next if s.is_a? Steps::PlanStep
    action = persistence.load_action(s)
    action.required_step_ids.include?(step.id)
  end

  steps_to_skip = dependent_steps.map do |dependent_step|
    steps_to_skip(dependent_step)
  end.flatten

  steps_to_skip << step

  if step.is_a? Steps::RunStep
    finalize_step_id = persistence.load_action(step).finalize_step_id
    steps_to_skip << steps[finalize_step_id] if finalize_step_id
  end

  return steps_to_skip.uniq
end

#sub_plansObject



204
205
206
# File 'lib/dynflow/execution_plan.rb', line 204

def sub_plans
  persistence.find_execution_plans(filters: { 'caller_execution_plan_id' => self.id })
end

#sub_plans_countObject



208
209
210
# File 'lib/dynflow/execution_plan.rb', line 208

def sub_plans_count
  persistence.find_execution_plan_counts(filters: { 'caller_execution_plan_id' => self.id })
end

#switch_flow(new_flow, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Switches the flow type (Sequence, Concurrence) to be used within the block.



387
388
389
390
391
392
393
# File 'lib/dynflow/execution_plan.rb', line 387

def switch_flow(new_flow, &block)
  @run_flow_stack << new_flow
  return block.call
ensure
  @run_flow_stack.pop
  current_run_flow.add_and_resolve(@dependency_graph, new_flow) if current_run_flow
end

#to_hashObject



434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
# File 'lib/dynflow/execution_plan.rb', line 434

def to_hash
  recursive_to_hash id:                id,
                    class:             self.class.to_s,
                    label:             label,
                    state:             state,
                    result:            result,
                    root_plan_step_id: root_plan_step && root_plan_step.id,
                    run_flow:          run_flow.encode,
                    finalize_flow:     finalize_flow.encode,
                    step_ids:          steps.map { |id, _| id },
                    started_at:        time_to_str(started_at),
                    ended_at:          time_to_str(ended_at),
                    execution_time:    execution_time,
                    real_time:         real_time,
                    execution_history: execution_history.to_hash
end

#update_state(state, history_notice: :auto) ⇒ Object

Parameters:

  • state (Symbol)

    representing the new state

  • history_notice (Symbol|string|false) (defaults to: :auto)

    should a note to execution_history be added as well? Possible values:

    - :auto (default) - the history notice will be added based on the new state
    - string - custom history notice is added
    - false - don't add any notice
    


120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/dynflow/execution_plan.rb', line 120

def update_state(state, history_notice: :auto)
  hooks_to_run = [state]
  original = self.state
  case self.state = state
  when :planning
    @started_at = Time.now.utc
  when :stopped
    @ended_at       = Time.now.utc
    @real_time      = @ended_at - @started_at unless @started_at.nil?
    @execution_time = compute_execution_time
    key = failure? ? :failure : :success
    Dynflow::Telemetry.with_instance do |t|
      t.increment_counter(:dynflow_finished_execution_plans, 1,
        telemetry_common_options.merge(:result => key.to_s))
    end
    hooks_to_run << key
    world.persistence.delete_delayed_plans(:execution_plan_uuid => id) if delay_record && original == :scheduled
    unlock_all_singleton_locks!
    unlock_execution_inhibition_lock!
  when :paused
    unlock_all_singleton_locks!
  else
    # ignore
  end
  logger.debug format('%13s %s    %9s >> %9s',
    'ExecutionPlan', id, original, state)
  add_history_notice(history_notice)
  self.save
  toggle_telemetry_state original == :pending ? nil : original.to_s,
    self.state == :stopped ? nil : self.state.to_s
  hooks_to_run.each { |kind| run_hooks kind }
end

#valid?Boolean

Returns:

  • (Boolean)


106
107
108
# File 'lib/dynflow/execution_plan.rb', line 106

def valid?
  true
end

#with_planning_scope(&block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



376
377
378
379
380
381
382
383
# File 'lib/dynflow/execution_plan.rb', line 376

def with_planning_scope(&block)
  @run_flow_stack   = []
  @dependency_graph = DependencyGraph.new
  switch_flow(run_flow, &block)
ensure
  @run_flow_stack   = nil
  @dependency_graph = nil
end