Class: Dynflow::Director
- Inherits:
-
Object
show all
- Includes:
- Algebrick::TypeCheck
- Defined in:
- lib/dynflow/director.rb,
lib/dynflow/director/queue_hash.rb,
lib/dynflow/director/flow_manager.rb,
lib/dynflow/director/sequence_cursor.rb,
lib/dynflow/director/sequential_manager.rb,
lib/dynflow/director/running_steps_manager.rb,
lib/dynflow/director/execution_plan_manager.rb
Overview
Director is responsible for telling what to do next when:
* new execution starts
* an event accurs
* some work is finished
It’s public methods (except terminate) return work items that the executor should understand
Defined Under Namespace
Classes: EventWorkItem, ExecutionPlanManager, FinalizeWorkItem, FlowManager, PlanningWorkItem, QueueHash, RunningStepsManager, SequenceCursor, SequentialManager, StepWorkItem, WorkItem
Constant Summary
collapse
- Event =
Algebrick.type do
fields! request_id: String,
execution_plan_id: String,
step_id: Integer,
event: Object,
result: Concurrent::Promises::ResolvableFuture,
optional: Algebrick::Types::Boolean
end
- UnprocessableEvent =
Class.new(Dynflow::Error)
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(world) ⇒ Director
Returns a new instance of Director.
166
167
168
169
170
171
172
|
# File 'lib/dynflow/director.rb', line 166
def initialize(world)
@world = world
@logger = world.logger
@execution_plan_managers = {}
@rescued_steps = {}
@planning_plans = Set.new
end
|
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
164
165
166
|
# File 'lib/dynflow/director.rb', line 164
def logger
@logger
end
|
Instance Method Details
#current_execution_plan_ids ⇒ Object
174
175
176
|
# File 'lib/dynflow/director.rb', line 174
def current_execution_plan_ids
@execution_plan_managers.keys
end
|
#halt(event) ⇒ Object
249
250
251
|
# File 'lib/dynflow/director.rb', line 249
def halt(event)
halt_execution(event.execution_plan_id)
end
|
#handle_event(event) ⇒ Object
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
|
# File 'lib/dynflow/director.rb', line 191
def handle_event(event)
Type! event, Event
execution_plan_manager = @execution_plan_managers[event.execution_plan_id]
if execution_plan_manager
execution_plan_manager.event(event)
elsif event.optional
event.result.reject "no manager for #{event.inspect}"
[]
else
raise Dynflow::Error, "no manager for #{event.inspect}"
end
rescue Dynflow::Error => e
event.result.reject e.message
raise e
end
|
#handle_planning(execution_plan_uuid) ⇒ Object
178
179
180
181
182
183
|
# File 'lib/dynflow/director.rb', line 178
def handle_planning(execution_plan_uuid)
return [] if @planning_plans.include? execution_plan_uuid
@planning_plans << execution_plan_uuid
[PlanningWorkItem.new(execution_plan_uuid, :default, @world.id)]
end
|
#start_execution(execution_plan_id, finished) ⇒ Object
185
186
187
188
189
|
# File 'lib/dynflow/director.rb', line 185
def start_execution(execution_plan_id, finished)
manager = track_execution_plan(execution_plan_id, finished)
return [] unless manager
unless_done(manager, manager.start)
end
|
#terminate ⇒ Object
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
|
# File 'lib/dynflow/director.rb', line 233
def terminate
unless @execution_plan_managers.empty?
logger.error "... cleaning #{@execution_plan_managers.size} execution plans ..."
begin
@execution_plan_managers.values.each do |manager|
manager.terminate
end
rescue Errors::PersistenceError
logger.error "could not to clean the data properly"
end
@execution_plan_managers.values.each do |manager|
finish_manager(manager)
end
end
end
|
#work_failed(work) ⇒ Object
called when there was an unhandled exception during the execution of the work (such as persistence issue) - in this case we just clean up the runtime from the execution plan and let it go (common cause for this is the execution plan being removed from database by external user)
224
225
226
227
228
229
230
231
|
# File 'lib/dynflow/director.rb', line 224
def work_failed(work)
if (manager = @execution_plan_managers[work.execution_plan_id])
manager.terminate
plan_missing = @world.persistence.find_execution_plans(:filters => { uuid: work.execution_plan_id }).empty?
finish_manager(manager, store: !plan_missing)
end
end
|
#work_finished(work) ⇒ Object
207
208
209
210
211
212
213
214
215
216
217
218
|
# File 'lib/dynflow/director.rb', line 207
def work_finished(work)
case work
when PlanningWorkItem
@planning_plans.delete(work.execution_plan_id)
@world.persistence.delete_delayed_plans(:execution_plan_uuid => work.execution_plan_id)
[]
else
manager = @execution_plan_managers[work.execution_plan_id]
return [] unless manager unless_done(manager, manager.what_is_next(work))
end
end
|