Module: Dynflow::Action::WithSubPlans

Includes:
Cancellable
Defined in:
lib/dynflow/action/with_sub_plans.rb

Defined Under Namespace

Classes: SubtaskFailedException

Constant Summary collapse

SubPlanFinished =
Algebrick.type do
  fields! :execution_plan_id => String,
          :success           => type { variants TrueClass, FalseClass }
end

Constants included from Cancellable

Cancellable::Abort, Cancellable::Cancel

Instance Method Summary collapse

Instance Method Details

#abort!Object



82
83
84
# File 'lib/dynflow/action/with_sub_plans.rb', line 82

def abort!
  cancel! true
end

#calculate_time_distributionObject



107
108
109
110
111
112
113
114
115
116
117
# File 'lib/dynflow/action/with_sub_plans.rb', line 107

def calculate_time_distribution
  time, count = input[:concurrency_control][:time]
  unless time.nil? || time.is_a?(Hash)
    # Assume concurrency level 1 unless stated otherwise
    level = input[:concurrency_control].fetch(:level, {}).fetch(:free, 1)
    semaphore = ::Dynflow::Semaphores::Stateful.new(nil, level,
      :interval => time.to_f / (count * level),
      :time_span => time)
    input[:concurrency_control][:time] = semaphore.to_hash
  end
end

#cancel!(force = false) ⇒ Object



76
77
78
79
80
# File 'lib/dynflow/action/with_sub_plans.rb', line 76

def cancel!(force = false)
  @world.throttle_limiter.cancel!(execution_plan_id)
  sub_plans('state' => 'running').each { |sub_plan| sub_plan.cancel(force) }
  suspend
end

#check_for_errors!Object



237
238
239
# File 'lib/dynflow/action/with_sub_plans.rb', line 237

def check_for_errors!
  raise SubtaskFailedException.new("A sub task failed") if output[:failed_count] > 0
end

#counts_set?Boolean

Returns:

  • (Boolean)


233
234
235
# File 'lib/dynflow/action/with_sub_plans.rb', line 233

def counts_set?
  output[:total_count] && output[:success_count] && output[:failed_count] && output[:pending_count]
end

#create_sub_plansObject

This method is abstract.

when the logic for the initiation of the subtasks is different from the default one.

Examples:


def create_sub_plans
  trigger(MyAction, "Hello")
end

def create_sub_plans
  [trigger(MyAction, "Hello 1"), trigger(MyAction, "Hello 2")]
end

Raises:

  • (NotImplementedError)


68
69
70
# File 'lib/dynflow/action/with_sub_plans.rb', line 68

def create_sub_plans
  raise NotImplementedError
end

#distribute_over_time(time_span, count) ⇒ Object



119
120
121
122
# File 'lib/dynflow/action/with_sub_plans.rb', line 119

def distribute_over_time(time_span, count)
  input[:concurrency_control] ||= {}
  input[:concurrency_control][:time] = [time_span, count]
end

#done?Boolean

Returns:

  • (Boolean)


198
199
200
201
202
203
204
# File 'lib/dynflow/action/with_sub_plans.rb', line 198

def done?
  if counts_set?
    output[:total_count] - output[:success_count] - output[:failed_count] <= 0
  else
    false
  end
end

#increase_counts(planned, failed, track_total = true) ⇒ Object



134
135
136
137
138
139
# File 'lib/dynflow/action/with_sub_plans.rb', line 134

def increase_counts(planned, failed, track_total = true)
  output[:total_count]   = output.fetch(:total_count, 0) + planned + failed if track_total
  output[:failed_count]  = output.fetch(:failed_count, 0) + failed
  output[:pending_count] = output.fetch(:pending_count, 0) + planned
  output[:success_count] ||= 0
end

#initiateObject



39
40
41
42
43
44
45
# File 'lib/dynflow/action/with_sub_plans.rb', line 39

def initiate
  if uses_concurrency_control
    calculate_time_distribution
    world.throttle_limiter.initialize_plan(execution_plan_id, input[:concurrency_control])
  end
  spawn_plans
end

#limit_concurrency_level(level) ⇒ Object



102
103
104
105
# File 'lib/dynflow/action/with_sub_plans.rb', line 102

def limit_concurrency_level(level)
  input[:concurrency_control] ||= {}
  input[:concurrency_control][:level] = ::Dynflow::Semaphores::Stateful.new(level).to_hash
end

#mark_as_done(plan_id, success) ⇒ Object



189
190
191
192
193
194
195
196
# File 'lib/dynflow/action/with_sub_plans.rb', line 189

def mark_as_done(plan_id, success)
  if success
    output[:success_count] += 1
  else
    output[:failed_count] += 1
  end
  output[:pending_count] -= 1
end

#notify_on_finish(plans) ⇒ Object



179
180
181
182
183
184
185
186
187
# File 'lib/dynflow/action/with_sub_plans.rb', line 179

def notify_on_finish(plans)
  suspend do |suspended_action|
    plans.each do |plan|
      plan.finished.on_resolution! do |success, value|
        suspended_action << SubPlanFinished[plan.id, success && (value.result == :success)]
      end
    end
  end
end

#on_finishObject



73
74
# File 'lib/dynflow/action/with_sub_plans.rb', line 73

def on_finish
end

#recalculate_countsObject



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/dynflow/action/with_sub_plans.rb', line 214

def recalculate_counts
  output.update(total_count: 0,
                failed_count: 0,
                success_count: 0,
                pending_count: 0)
  sub_plans.each do |sub_plan|
    output[:total_count] += 1
    if sub_plan.state == :stopped
      if sub_plan.error?
        output[:failed_count] += 1
      else
        output[:success_count] += 1
      end
    else
      output[:pending_count] += 1
    end
  end
end

#resumeObject



152
153
154
155
156
157
158
159
160
161
# File 'lib/dynflow/action/with_sub_plans.rb', line 152

def resume
  if sub_plans.all? { |sub_plan| sub_plan.error_in_plan? }
    # We're starting over and need to reset the counts
    %w(total failed pending success).each { |key| output.delete("#{key}_count".to_sym) }
    initiate
  else
    recalculate_counts
    try_to_finish or fail "Some sub plans are still not finished"
  end
end

#run(event = nil) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/dynflow/action/with_sub_plans.rb', line 18

def run(event = nil)
  match event,
    (on nil do
       if output[:total_count]
         resume
       else
         initiate
       end
     end),
    (on SubPlanFinished do
       mark_as_done(event.execution_plan_id, event.success)
       try_to_finish or suspend
     end),
    (on Action::Cancellable::Cancel do
       cancel!
     end),
    (on Action::Cancellable::Abort do
       abort!
     end)
end

#run_progressObject



206
207
208
209
210
211
212
# File 'lib/dynflow/action/with_sub_plans.rb', line 206

def run_progress
  if counts_set? && output[:total_count] > 0
    (output[:success_count] + output[:failed_count]).to_f / output[:total_count]
  else
    0.1
  end
end

#spawn_plansObject



47
48
49
50
51
# File 'lib/dynflow/action/with_sub_plans.rb', line 47

def spawn_plans
  sub_plans = create_sub_plans
  sub_plans = Array[sub_plans] unless sub_plans.is_a? Array
  wait_for_sub_plans sub_plans
end

#sub_plans(filter = {}) ⇒ Object



163
164
165
166
167
168
169
170
171
# File 'lib/dynflow/action/with_sub_plans.rb', line 163

def sub_plans(filter = {})
  filters = { 'caller_execution_plan_id' => execution_plan_id,
              'caller_action_id' => self.id }
  if filter.empty?
    @sub_plans ||= world.persistence.find_execution_plans(filters: filters)
  else
    world.persistence.find_execution_plans(filters: filters.merge(filter))
  end
end

#sub_plans_count(filter = {}) ⇒ Object



173
174
175
176
177
# File 'lib/dynflow/action/with_sub_plans.rb', line 173

def sub_plans_count(filter = {})
  filters = { 'caller_execution_plan_id' => execution_plan_id,
              'caller_action_id' => self.id }
  world.persistence.find_execution_plan_counts(filters: filters.merge(filter))
end

#trigger(action_class, *args) ⇒ Object

Helper for creating sub plans



87
88
89
90
91
92
93
# File 'lib/dynflow/action/with_sub_plans.rb', line 87

def trigger(action_class, *args)
  if uses_concurrency_control
    trigger_with_concurrency_control(action_class, *args)
  else
    world.trigger { world.plan_with_options(action_class: action_class, args: args, caller_action: self) }
  end
end

#trigger_with_concurrency_control(action_class, *args) ⇒ Object



95
96
97
98
99
100
# File 'lib/dynflow/action/with_sub_plans.rb', line 95

def trigger_with_concurrency_control(action_class, *args)
  record = world.plan_with_options(action_class: action_class, args: args, caller_action: self)
  records = [[record.id], []]
  records.reverse! unless record.state == :planned
  @world.throttle_limiter.handle_plans!(execution_plan_id, *records).first
end

#try_to_finishObject



141
142
143
144
145
146
147
148
149
150
# File 'lib/dynflow/action/with_sub_plans.rb', line 141

def try_to_finish
  if done?
    world.throttle_limiter.finish(execution_plan_id)
    check_for_errors!
    on_finish
    return true
  else
    return false
  end
end

#uses_concurrency_controlObject



241
242
243
# File 'lib/dynflow/action/with_sub_plans.rb', line 241

def uses_concurrency_control
  @uses_concurrency_control = input.key? :concurrency_control
end

#wait_for_sub_plans(sub_plans) ⇒ Object



124
125
126
127
128
129
130
131
132
# File 'lib/dynflow/action/with_sub_plans.rb', line 124

def wait_for_sub_plans(sub_plans)
  planned, failed = sub_plans.partition(&:planned?)
  increase_counts(planned.count, failed.count)
  if planned.any?
    notify_on_finish(planned)
  else
    check_for_errors!
  end
end