Module: Dynflow::Action::V2::WithSubPlans
  
  
  
  
  
  
  
  
  
      - Includes:
 
      - Cancellable
 
  
  
  
  
  
  
    - Defined in:
 
    - lib/dynflow/action/v2/with_sub_plans.rb
 
  
  
 
Defined Under Namespace
  
    
  
    
      Classes: SubtaskFailedException
    
  
  
    
      Constant Summary
      collapse
    
    
      
        - DEFAULT_BATCH_SIZE =
          
        
 
        100
 
      
        - DEFAULT_POLLING_INTERVAL =
          
        
 
        15
 
      
        - Ping =
          
        
 
        Algebrick.atom
 
      
    
  
  
  
  Constants included
     from Cancellable
  Cancellable::Abort, Cancellable::Cancel
  
    
      Instance Method Summary
      collapse
    
    
  
  
  
  
  
  
  
  
    Instance Method Details
    
      
  
  
    #abort!  ⇒ Object 
  
  
  
  
    
      
182
183
184 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 182
def abort!
  cancel! true
end 
     | 
  
 
    
      
  
  
    #batch(from, size)  ⇒ Object 
  
  
  
  
    
Should return a slice of size items starting from item with index from
   
 
  
    
      
32
33
34 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 32
def batch(from, size)
  raise NotImplementedError
end 
     | 
  
 
    
      
  
  
    #batch_size  ⇒ Object 
  
  
  
  
    
      
27
28
29 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 27
def batch_size
  DEFAULT_BATCH_SIZE
end 
     | 
  
 
    
      
  
  
    #can_spawn_next_batch?  ⇒ Boolean 
  
  
  
  
    
      
196
197
198 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 196
def can_spawn_next_batch?
  remaining_count > 0
end 
     | 
  
 
    
      
  
  
    #cancel!(force = false)  ⇒ Object 
  
  
  
  
  
    
      
173
174
175
176
177
178
179
180 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 173
def cancel!(force = false)
    output[:cancelled_count] = total_count - output[:planned_count]
  on_planning_finished if output[:cancelled_count].positive?
    sub_plans(:state => 'running').each { |sub_plan| sub_plan.cancel(force) if sub_plan.cancellable? }
  suspend
end
     | 
  
 
    
      
  
  
    #check_for_errors!  ⇒ Object 
  
  
  
  
    
      
144
145
146 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 144
def check_for_errors!
  raise SubtaskFailedException.new("A sub task failed") if output[:failed_count] > 0
end
     | 
  
 
    
      
  
  
    #concurrency_limit  ⇒ Object 
  
  
  
  
    
      
159
160
161
162 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 159
def concurrency_limit
  input[:dynflow] ||= {}
  input[:dynflow][:concurrency_limit]
end
     | 
  
 
    
      
  
  
    #concurrency_limit_capacity  ⇒ Object 
  
  
  
  
    
      
164
165
166
167
168
169
170 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 164
def concurrency_limit_capacity
  if limit = concurrency_limit
    return limit unless counts_set?
    capacity = limit - (output[:planned_count] - (output[:success_count] + output[:failed_count]))
    [0, capacity].max
  end
end
     | 
  
 
    
      
  
  
    #counts_set?  ⇒ Boolean 
  
  
  
  
    
      
140
141
142 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 140
def counts_set?
  output[:total_count] && output[:success_count] && output[:failed_count] && output[:pending_count]
end 
     | 
  
 
    
      
  
  
    #create_sub_plans  ⇒ Object 
  
  
  
  
    
      
18
19
20 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 18
def create_sub_plans
  raise NotImplementedError
end 
     | 
  
 
    
      
  
  
    #current_batch  ⇒ Object 
  
  
  
  
    
Batching Returns the items in the current batch
   
 
  
  
    
      
188
189
190
191
192
193
194 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 188
def current_batch
  start_position = output[:planned_count]
  size = batch_size
  size = concurrency_limit_capacity if concurrency_limit
  size = start_position + size > total_count ? total_count - start_position : size
  batch(start_position, size)
end 
     | 
  
 
    
      
  
  
    #done?  ⇒ Boolean 
  
  
  
  
    
      
118
119
120
121
122 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 118
def done?
  return false if can_spawn_next_batch? || !counts_set?
  total_count - output[:success_count] - output[:failed_count] - output[:cancelled_count] <= 0
end 
     | 
  
 
    
      
  
  
    #increase_counts(planned, failed)  ⇒ Object 
  
  
  
  
    
      
103
104
105
106
107
108 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 103
def increase_counts(planned, failed)
  output[:planned_count] += planned + failed
  output[:failed_count]  = output.fetch(:failed_count, 0) + failed
  output[:pending_count] = output.fetch(:pending_count, 0) + planned
  output[:success_count] ||= 0
end 
     | 
  
 
    
      
  
  
    #initiate  ⇒ Object 
  
  
  
  
    
      
66
67
68
69
70
71 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 66
def initiate
  output[:planned_count] = 0
  output[:cancelled_count] = 0
  output[:total_count] = total_count
  spawn_plans
end 
     | 
  
 
    
      
  
  
    #limit_concurrency_level!(level)  ⇒ Object 
  
  
  
  
  
    
      
154
155
156
157 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 154
def limit_concurrency_level!(level)
  input[:dynflow] ||= {}
  input[:dynflow][:concurrency_limit] = level
end
     | 
  
 
    
      
  
  
    #on_finish  ⇒ Object 
  
  
  
  
  
    
      
42
43 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 42
def on_finish
end 
     | 
  
 
    
      
  
  
    #on_planning_finished  ⇒ Object 
  
  
  
  
    
      
45
46 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 45
def on_planning_finished
end 
     | 
  
 
    
      
  
  
    #polling_interval  ⇒ Object 
  
  
  
  
 
    
      
  
  
    #recalculate_counts  ⇒ Object 
  
  
  
  
    
      
131
132
133
134
135
136
137
138 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 131
def recalculate_counts
  total   = total_count
  failed  = sub_plans_count('state' => %w(paused stopped), 'result' => %w(error warning))
  success = sub_plans_count('state' => 'stopped', 'result' => 'success')
  output.update(:pending_count => total - failed - success,
                :failed_count  => failed - output.fetch(:resumed_count, 0),
                :success_count => success)
end
     | 
  
 
    
      
  
  
    #remaining_count  ⇒ Object 
  
  
  
  
    
      
200
201
202 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 200
def remaining_count
  total_count - output[:cancelled_count] - output[:planned_count]
end 
     | 
  
 
    
      
  
  
    #resume  ⇒ Object 
  
  
  
  
    
      
73
74
75
76
77
78
79
80
81
82
83 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 73
def resume
  if sub_plans.all? { |sub_plan| sub_plan.error_in_plan? }
    output[:resumed_count] ||= 0
    output[:resumed_count] += output[:failed_count]
        %w(total failed pending success).each { |key| output.delete("#{key}_count".to_sym) }
    initiate
  else
    tick
  end
end
     | 
  
 
    
      
  
  
    #run(event = nil)  ⇒ Object 
  
  
  
  
    
      
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 48
def run(event = nil)
  case event
  when nil
    if output[:total_count]
      resume
    else
      initiate
    end
  when Ping
    tick
  when ::Dynflow::Action::Cancellable::Cancel
    cancel!
  when ::Dynflow::Action::Cancellable::Abort
    abort!
  end
  try_to_finish || suspend_and_ping
end
     | 
  
 
    
      
  
  
    #run_progress  ⇒ Object 
  
  
  
  
    
      
124
125
126
127
128
129 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 124
def run_progress
  return 0.1 unless counts_set? && total_count > 0
  sum = output.values_at(:success_count, :cancelled_count, :failed_count).reduce(:+)
  sum.to_f / total_count
end 
     | 
  
 
    
      
  
  
    #spawn_plans  ⇒ Object 
  
  
  
  
    
      
96
97
98
99
100
101 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 96
def spawn_plans
  sub_plans = create_sub_plans
  sub_plans = Array[sub_plans] unless sub_plans.is_a? Array
  increase_counts(sub_plans.count, 0)
  on_planning_finished unless can_spawn_next_batch?
end 
     | 
  
 
    
      
  
  
    #suspend_and_ping  ⇒ Object 
  
  
  
  
    
      
90
91
92
93
94 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 90
def suspend_and_ping
  delay = (concurrency_limit.nil? || concurrency_limit_capacity > 0) && can_spawn_next_batch? ? nil : polling_interval
  plan_event(Ping, delay)
  suspend
end 
     | 
  
 
    
      
  
  
    #tick  ⇒ Object 
  
  
  
  
    
      
85
86
87
88 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 85
def tick
  recalculate_counts
  spawn_plans if can_spawn_next_batch?
end 
     | 
  
 
    
      
  
  
    #total_count  ⇒ Object 
  
  
  
  
    
Should return the expected total count of tasks
   
 
  
    
      
23
24
25 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 23
def total_count
  raise NotImplementedError
end 
     | 
  
 
    
      
  
  
    #trigger(action_class, *args)  ⇒ Object 
  
  
  
  
    
Helper for creating sub plans
   
 
  
  
    
      
149
150
151 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 149
def trigger(action_class, *args)
  world.trigger { world.plan_with_options(action_class: action_class, args: args, caller_action: self) }
end
     | 
  
 
    
      
  
  
    #try_to_finish  ⇒ Object 
  
  
  
  
    
      
110
111
112
113
114
115
116 
     | 
    
      # File 'lib/dynflow/action/v2/with_sub_plans.rb', line 110
def try_to_finish
  return false unless done?
  check_for_errors!
  on_finish
  true
end 
     |