Class: Dynflow::Director
- Inherits:
-
Object
show all
- Includes:
- Algebrick::TypeCheck
- Defined in:
- lib/dynflow/director.rb,
lib/dynflow/director/queue_hash.rb,
lib/dynflow/director/flow_manager.rb,
lib/dynflow/director/sequence_cursor.rb,
lib/dynflow/director/sequential_manager.rb,
lib/dynflow/director/running_steps_manager.rb,
lib/dynflow/director/execution_plan_manager.rb
Overview
Director is responsible for telling what to do next when:
* new execution starts
* an event accurs
* some work is finished
It’s public methods (except terminate) return work items that the executor should understand
Defined Under Namespace
Classes: EventWorkItem, ExecutionPlanManager, FinalizeWorkItem, FlowManager, PlanningWorkItem, QueueHash, RunningStepsManager, SequenceCursor, SequentialManager, StepWorkItem, WorkItem
Constant Summary
collapse
- Event =
Algebrick.type do
fields! request_id: String,
execution_plan_id: String,
step_id: Integer,
event: Object,
result: Concurrent::Promises::ResolvableFuture,
optional: Algebrick::Types::Boolean
end
- UnprocessableEvent =
Class.new(Dynflow::Error)
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(world) ⇒ Director
Returns a new instance of Director.
174
175
176
177
178
179
180
|
# File 'lib/dynflow/director.rb', line 174
def initialize(world)
@world = world
@logger = world.logger
@execution_plan_managers = {}
@rescued_steps = {}
@planning_plans = Set.new
end
|
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
172
173
174
|
# File 'lib/dynflow/director.rb', line 172
def logger
@logger
end
|
Instance Method Details
#current_execution_plan_ids ⇒ Object
182
183
184
|
# File 'lib/dynflow/director.rb', line 182
def current_execution_plan_ids
@execution_plan_managers.keys
end
|
#halt(event) ⇒ Object
257
258
259
|
# File 'lib/dynflow/director.rb', line 257
def halt(event)
halt_execution(event.execution_plan_id)
end
|
#handle_event(event) ⇒ Object
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
|
# File 'lib/dynflow/director.rb', line 199
def handle_event(event)
Type! event, Event
execution_plan_manager = @execution_plan_managers[event.execution_plan_id]
if execution_plan_manager
execution_plan_manager.event(event)
elsif event.optional
event.result.reject "no manager for #{event.inspect}"
[]
else
raise Dynflow::Error, "no manager for #{event.inspect}"
end
rescue Dynflow::Error => e
event.result.reject e.message
raise e
end
|
#handle_planning(execution_plan_uuid) ⇒ Object
186
187
188
189
190
191
|
# File 'lib/dynflow/director.rb', line 186
def handle_planning(execution_plan_uuid)
return [] if @planning_plans.include? execution_plan_uuid
@planning_plans << execution_plan_uuid
[PlanningWorkItem.new(execution_plan_uuid, :default, @world.id)]
end
|
#start_execution(execution_plan_id, finished) ⇒ Object
193
194
195
196
197
|
# File 'lib/dynflow/director.rb', line 193
def start_execution(execution_plan_id, finished)
manager = track_execution_plan(execution_plan_id, finished)
return [] unless manager
unless_done(manager, manager.start)
end
|
#terminate ⇒ Object
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
|
# File 'lib/dynflow/director.rb', line 241
def terminate
unless @execution_plan_managers.empty?
logger.error "... cleaning #{@execution_plan_managers.size} execution plans ..."
begin
@execution_plan_managers.values.each do |manager|
manager.terminate
end
rescue Errors::PersistenceError
logger.error "could not to clean the data properly"
end
@execution_plan_managers.values.each do |manager|
finish_manager(manager)
end
end
end
|
#work_failed(work) ⇒ Object
called when there was an unhandled exception during the execution of the work (such as persistence issue) - in this case we just clean up the runtime from the execution plan and let it go (common cause for this is the execution plan being removed from database by external user)
232
233
234
235
236
237
238
239
|
# File 'lib/dynflow/director.rb', line 232
def work_failed(work)
if (manager = @execution_plan_managers[work.execution_plan_id])
manager.terminate
plan_missing = @world.persistence.find_execution_plans(:filters => { uuid: work.execution_plan_id }).empty?
finish_manager(manager, store: !plan_missing)
end
end
|
#work_finished(work) ⇒ Object
215
216
217
218
219
220
221
222
223
224
225
226
|
# File 'lib/dynflow/director.rb', line 215
def work_finished(work)
case work
when PlanningWorkItem
@planning_plans.delete(work.execution_plan_id)
@world.persistence.delete_delayed_plans(:execution_plan_uuid => work.execution_plan_id)
[]
else
manager = @execution_plan_managers[work.execution_plan_id]
return [] unless manager unless_done(manager, manager.what_is_next(work))
end
end
|