Module: Dynflow::Action::V2::WithSubPlans
- Includes:
- Cancellable
- Defined in:
- lib/dynflow/action/v2/with_sub_plans.rb
Defined Under Namespace
Classes: SubtaskFailedException
Constant Summary
collapse
- DEFAULT_BATCH_SIZE =
100
- DEFAULT_POLLING_INTERVAL =
15
- Ping =
Algebrick.atom
Constants included
from Cancellable
Cancellable::Abort, Cancellable::Cancel
Instance Method Summary
collapse
Instance Method Details
#abort! ⇒ Object
182
183
184
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 182
def abort!
cancel! true
end
|
#batch(from, size) ⇒ Object
Should return a slice of size items starting from item with index from
32
33
34
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 32
def batch(from, size)
raise NotImplementedError
end
|
#batch_size ⇒ Object
27
28
29
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 27
def batch_size
DEFAULT_BATCH_SIZE
end
|
#can_spawn_next_batch? ⇒ Boolean
196
197
198
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 196
def can_spawn_next_batch?
remaining_count > 0
end
|
#cancel!(force = false) ⇒ Object
173
174
175
176
177
178
179
180
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 173
def cancel!(force = false)
output[:cancelled_count] = total_count - output[:planned_count]
on_planning_finished if output[:cancelled_count].positive?
sub_plans(:state => 'running').each { |sub_plan| sub_plan.cancel(force) if sub_plan.cancellable? }
suspend
end
|
#check_for_errors! ⇒ Object
144
145
146
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 144
def check_for_errors!
raise SubtaskFailedException.new("A sub task failed") if output[:failed_count] > 0
end
|
#concurrency_limit ⇒ Object
159
160
161
162
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 159
def concurrency_limit
input[:dynflow] ||= {}
input[:dynflow][:concurrency_limit]
end
|
#concurrency_limit_capacity ⇒ Object
164
165
166
167
168
169
170
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 164
def concurrency_limit_capacity
if limit = concurrency_limit
return limit unless counts_set?
capacity = limit - (output[:planned_count] - (output[:success_count] + output[:failed_count]))
[0, capacity].max
end
end
|
#counts_set? ⇒ Boolean
140
141
142
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 140
def counts_set?
output[:total_count] && output[:success_count] && output[:failed_count] && output[:pending_count]
end
|
#create_sub_plans ⇒ Object
18
19
20
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 18
def create_sub_plans
raise NotImplementedError
end
|
#current_batch ⇒ Object
Batching Returns the items in the current batch
188
189
190
191
192
193
194
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 188
def current_batch
start_position = output[:planned_count]
size = batch_size
size = concurrency_limit_capacity if concurrency_limit
size = start_position + size > total_count ? total_count - start_position : size
batch(start_position, size)
end
|
#done? ⇒ Boolean
118
119
120
121
122
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 118
def done?
return false if can_spawn_next_batch? || !counts_set?
total_count - output[:success_count] - output[:failed_count] - output[:cancelled_count] <= 0
end
|
#increase_counts(planned, failed) ⇒ Object
103
104
105
106
107
108
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 103
def increase_counts(planned, failed)
output[:planned_count] += planned + failed
output[:failed_count] = output.fetch(:failed_count, 0) + failed
output[:pending_count] = output.fetch(:pending_count, 0) + planned
output[:success_count] ||= 0
end
|
#initiate ⇒ Object
66
67
68
69
70
71
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 66
def initiate
output[:planned_count] = 0
output[:cancelled_count] = 0
output[:total_count] = total_count
spawn_plans
end
|
#limit_concurrency_level!(level) ⇒ Object
154
155
156
157
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 154
def limit_concurrency_level!(level)
input[:dynflow] ||= {}
input[:dynflow][:concurrency_limit] = level
end
|
#on_finish ⇒ Object
42
43
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 42
def on_finish
end
|
#on_planning_finished ⇒ Object
45
46
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 45
def on_planning_finished
end
|
#polling_interval ⇒ Object
#recalculate_counts ⇒ Object
131
132
133
134
135
136
137
138
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 131
def recalculate_counts
total = total_count
failed = sub_plans_count('state' => %w(paused stopped), 'result' => %w(error warning))
success = sub_plans_count('state' => 'stopped', 'result' => 'success')
output.update(:pending_count => total - failed - success,
:failed_count => failed - output.fetch(:resumed_count, 0),
:success_count => success)
end
|
#remaining_count ⇒ Object
200
201
202
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 200
def remaining_count
total_count - output[:cancelled_count] - output[:planned_count]
end
|
#resume ⇒ Object
73
74
75
76
77
78
79
80
81
82
83
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 73
def resume
if sub_plans.all? { |sub_plan| sub_plan.error_in_plan? }
output[:resumed_count] ||= 0
output[:resumed_count] += output[:failed_count]
%w(total failed pending success).each { |key| output.delete("#{key}_count".to_sym) }
initiate
else
tick
end
end
|
#run(event = nil) ⇒ Object
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 48
def run(event = nil)
case event
when nil
if output[:total_count]
resume
else
initiate
end
when Ping
tick
when ::Dynflow::Action::Cancellable::Cancel
cancel!
when ::Dynflow::Action::Cancellable::Abort
abort!
end
try_to_finish || suspend_and_ping
end
|
#run_progress ⇒ Object
124
125
126
127
128
129
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 124
def run_progress
return 0.1 unless counts_set? && total_count > 0
sum = output.values_at(:success_count, :cancelled_count, :failed_count).reduce(:+)
sum.to_f / total_count
end
|
#spawn_plans ⇒ Object
96
97
98
99
100
101
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 96
def spawn_plans
sub_plans = create_sub_plans
sub_plans = Array[sub_plans] unless sub_plans.is_a? Array
increase_counts(sub_plans.count, 0)
on_planning_finished unless can_spawn_next_batch?
end
|
#suspend_and_ping ⇒ Object
90
91
92
93
94
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 90
def suspend_and_ping
delay = (concurrency_limit.nil? || concurrency_limit_capacity > 0) && can_spawn_next_batch? ? nil : polling_interval
plan_event(Ping, delay)
suspend
end
|
#tick ⇒ Object
85
86
87
88
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 85
def tick
recalculate_counts
spawn_plans if can_spawn_next_batch?
end
|
#total_count ⇒ Object
Should return the expected total count of tasks
23
24
25
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 23
def total_count
raise NotImplementedError
end
|
#trigger(action_class, *args) ⇒ Object
Helper for creating sub plans
149
150
151
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 149
def trigger(action_class, *args)
world.trigger { world.plan_with_options(action_class: action_class, args: args, caller_action: self) }
end
|
#try_to_finish ⇒ Object
110
111
112
113
114
115
116
|
# File 'lib/dynflow/action/v2/with_sub_plans.rb', line 110
def try_to_finish
return false unless done?
check_for_errors!
on_finish
true
end
|