Module: Dynflow::Action::WithSubPlans
- Includes:
- Cancellable
- Defined in:
- lib/dynflow/action/with_sub_plans.rb
Defined Under Namespace
Classes: SubtaskFailedException
Constant Summary
collapse
- SubPlanFinished =
Algebrick.type do
fields! :execution_plan_id => String,
:success => type { variants TrueClass, FalseClass }
end
Constants included
from Cancellable
Cancellable::Abort, Cancellable::Cancel
Instance Method Summary
collapse
Instance Method Details
#abort! ⇒ Object
82
83
84
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 82
def abort!
cancel! true
end
|
#calculate_time_distribution ⇒ Object
107
108
109
110
111
112
113
114
115
116
117
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 107
def calculate_time_distribution
time, count = input[:concurrency_control][:time]
unless time.nil? || time.is_a?(Hash)
level = input[:concurrency_control].fetch(:level, {}).fetch(:free, 1)
semaphore = ::Dynflow::Semaphores::Stateful.new(nil, level,
:interval => time.to_f / (count * level),
:time_span => time)
input[:concurrency_control][:time] = semaphore.to_hash
end
end
|
#cancel!(force = false) ⇒ Object
76
77
78
79
80
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 76
def cancel!(force = false)
@world.throttle_limiter.cancel!(execution_plan_id)
sub_plans('state' => 'running').each { |sub_plan| sub_plan.cancel(force) }
suspend
end
|
#check_for_errors! ⇒ Object
237
238
239
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 237
def check_for_errors!
raise SubtaskFailedException.new("A sub task failed") if output[:failed_count] > 0
end
|
#counts_set? ⇒ Boolean
233
234
235
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 233
def counts_set?
output[:total_count] && output[:success_count] && output[:failed_count] && output[:pending_count]
end
|
#create_sub_plans ⇒ Object
This method is abstract.
when the logic for the initiation of the subtasks is different from the default one.
68
69
70
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 68
def create_sub_plans
raise NotImplementedError
end
|
#distribute_over_time(time_span, count) ⇒ Object
119
120
121
122
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 119
def distribute_over_time(time_span, count)
input[:concurrency_control] ||= {}
input[:concurrency_control][:time] = [time_span, count]
end
|
#done? ⇒ Boolean
198
199
200
201
202
203
204
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 198
def done?
if counts_set?
output[:total_count] - output[:success_count] - output[:failed_count] <= 0
else
false
end
end
|
#increase_counts(planned, failed, track_total = true) ⇒ Object
134
135
136
137
138
139
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 134
def increase_counts(planned, failed, track_total = true)
output[:total_count] = output.fetch(:total_count, 0) + planned + failed if track_total
output[:failed_count] = output.fetch(:failed_count, 0) + failed
output[:pending_count] = output.fetch(:pending_count, 0) + planned
output[:success_count] ||= 0
end
|
#initiate ⇒ Object
39
40
41
42
43
44
45
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 39
def initiate
if uses_concurrency_control
calculate_time_distribution
world.throttle_limiter.initialize_plan(execution_plan_id, input[:concurrency_control])
end
spawn_plans
end
|
#limit_concurrency_level(level) ⇒ Object
102
103
104
105
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 102
def limit_concurrency_level(level)
input[:concurrency_control] ||= {}
input[:concurrency_control][:level] = ::Dynflow::Semaphores::Stateful.new(level).to_hash
end
|
#mark_as_done(plan_id, success) ⇒ Object
189
190
191
192
193
194
195
196
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 189
def mark_as_done(plan_id, success)
if success
output[:success_count] += 1
else
output[:failed_count] += 1
end
output[:pending_count] -= 1
end
|
#notify_on_finish(plans) ⇒ Object
179
180
181
182
183
184
185
186
187
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 179
def notify_on_finish(plans)
suspend do |suspended_action|
plans.each do |plan|
plan.finished.on_resolution! do |success, value|
suspended_action << SubPlanFinished[plan.id, success && (value.result == :success)]
end
end
end
end
|
#on_finish ⇒ Object
73
74
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 73
def on_finish
end
|
#recalculate_counts ⇒ Object
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 214
def recalculate_counts
output.update(total_count: 0,
failed_count: 0,
success_count: 0,
pending_count: 0)
sub_plans.each do |sub_plan|
output[:total_count] += 1
if sub_plan.state == :stopped
if sub_plan.error?
output[:failed_count] += 1
else
output[:success_count] += 1
end
else
output[:pending_count] += 1
end
end
end
|
#resume ⇒ Object
152
153
154
155
156
157
158
159
160
161
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 152
def resume
if sub_plans.all? { |sub_plan| sub_plan.error_in_plan? }
%w(total failed pending success).each { |key| output.delete("#{key}_count".to_sym) }
initiate
else
recalculate_counts
try_to_finish or fail "Some sub plans are still not finished"
end
end
|
#run(event = nil) ⇒ Object
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 18
def run(event = nil)
match event,
(on nil do
if output[:total_count]
resume
else
initiate
end
end),
(on SubPlanFinished do
mark_as_done(event.execution_plan_id, event.success)
try_to_finish or suspend
end),
(on Action::Cancellable::Cancel do
cancel!
end),
(on Action::Cancellable::Abort do
abort!
end)
end
|
#run_progress ⇒ Object
206
207
208
209
210
211
212
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 206
def run_progress
if counts_set? && output[:total_count] > 0
(output[:success_count] + output[:failed_count]).to_f / output[:total_count]
else
0.1
end
end
|
#spawn_plans ⇒ Object
47
48
49
50
51
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 47
def spawn_plans
sub_plans = create_sub_plans
sub_plans = Array[sub_plans] unless sub_plans.is_a? Array
wait_for_sub_plans sub_plans
end
|
#sub_plans(filter = {}) ⇒ Object
163
164
165
166
167
168
169
170
171
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 163
def sub_plans(filter = {})
filters = { 'caller_execution_plan_id' => execution_plan_id,
'caller_action_id' => self.id }
if filter.empty?
@sub_plans ||= world.persistence.find_execution_plans(filters: filters)
else
world.persistence.find_execution_plans(filters: filters.merge(filter))
end
end
|
#sub_plans_count(filter = {}) ⇒ Object
173
174
175
176
177
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 173
def sub_plans_count(filter = {})
filters = { 'caller_execution_plan_id' => execution_plan_id,
'caller_action_id' => self.id }
world.persistence.find_execution_plan_counts(filters: filters.merge(filter))
end
|
#trigger(action_class, *args) ⇒ Object
Helper for creating sub plans
87
88
89
90
91
92
93
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 87
def trigger(action_class, *args)
if uses_concurrency_control
trigger_with_concurrency_control(action_class, *args)
else
world.trigger { world.plan_with_options(action_class: action_class, args: args, caller_action: self) }
end
end
|
#trigger_with_concurrency_control(action_class, *args) ⇒ Object
95
96
97
98
99
100
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 95
def trigger_with_concurrency_control(action_class, *args)
record = world.plan_with_options(action_class: action_class, args: args, caller_action: self)
records = [[record.id], []]
records.reverse! unless record.state == :planned
@world.throttle_limiter.handle_plans!(execution_plan_id, *records).first
end
|
#try_to_finish ⇒ Object
141
142
143
144
145
146
147
148
149
150
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 141
def try_to_finish
if done?
world.throttle_limiter.finish(execution_plan_id)
check_for_errors!
on_finish
return true
else
return false
end
end
|
#uses_concurrency_control ⇒ Object
241
242
243
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 241
def uses_concurrency_control
@uses_concurrency_control = input.key? :concurrency_control
end
|
#wait_for_sub_plans(sub_plans) ⇒ Object
124
125
126
127
128
129
130
131
132
|
# File 'lib/dynflow/action/with_sub_plans.rb', line 124
def wait_for_sub_plans(sub_plans)
planned, failed = sub_plans.partition(&:planned?)
increase_counts(planned.count, failed.count)
if planned.any?
notify_on_finish(planned)
else
check_for_errors!
end
end
|