Class: Dynflow::Director

Inherits:
Object
  • Object
show all
Includes:
Algebrick::TypeCheck
Defined in:
lib/dynflow/director.rb,
lib/dynflow/director/queue_hash.rb,
lib/dynflow/director/flow_manager.rb,
lib/dynflow/director/sequence_cursor.rb,
lib/dynflow/director/sequential_manager.rb,
lib/dynflow/director/running_steps_manager.rb,
lib/dynflow/director/execution_plan_manager.rb

Overview

Director is responsible for telling what to do next when:

* new execution starts
* an event accurs
* some work is finished

It’s public methods (except terminate) return work items that the executor should understand

Defined Under Namespace

Classes: EventWorkItem, ExecutionPlanManager, FinalizeWorkItem, FlowManager, PlanningWorkItem, QueueHash, RunningStepsManager, SequenceCursor, SequentialManager, StepWorkItem, WorkItem

Constant Summary collapse

Event =
Algebrick.type do
  fields! request_id:        String,
          execution_plan_id: String,
          step_id:           Integer,
          event:             Object,
          result:            Concurrent::Promises::ResolvableFuture,
          optional:          Algebrick::Types::Boolean
end
UnprocessableEvent =
Class.new(Dynflow::Error)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(world) ⇒ Director

Returns a new instance of Director.



174
175
176
177
178
179
180
# File 'lib/dynflow/director.rb', line 174

def initialize(world)
  @world = world
  @logger = world.logger
  @execution_plan_managers = {}
  @rescued_steps = {}
  @planning_plans = Set.new
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



172
173
174
# File 'lib/dynflow/director.rb', line 172

def logger
  @logger
end

Instance Method Details

#current_execution_plan_idsObject



182
183
184
# File 'lib/dynflow/director.rb', line 182

def current_execution_plan_ids
  @execution_plan_managers.keys
end

#halt(event) ⇒ Object



257
258
259
# File 'lib/dynflow/director.rb', line 257

def halt(event)
  halt_execution(event.execution_plan_id)
end

#handle_event(event) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/dynflow/director.rb', line 199

def handle_event(event)
  Type! event, Event
  execution_plan_manager = @execution_plan_managers[event.execution_plan_id]
  if execution_plan_manager
    execution_plan_manager.event(event)
  elsif event.optional
    event.result.reject "no manager for #{event.inspect}"
    []
  else
    raise Dynflow::Error, "no manager for #{event.inspect}"
  end
rescue Dynflow::Error => e
  event.result.reject e.message
  raise e
end

#handle_planning(execution_plan_uuid) ⇒ Object



186
187
188
189
190
191
# File 'lib/dynflow/director.rb', line 186

def handle_planning(execution_plan_uuid)
  return [] if @planning_plans.include? execution_plan_uuid

  @planning_plans << execution_plan_uuid
  [PlanningWorkItem.new(execution_plan_uuid, :default, @world.id)]
end

#start_execution(execution_plan_id, finished) ⇒ Object



193
194
195
196
197
# File 'lib/dynflow/director.rb', line 193

def start_execution(execution_plan_id, finished)
  manager = track_execution_plan(execution_plan_id, finished)
  return [] unless manager
  unless_done(manager, manager.start)
end

#terminateObject



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/dynflow/director.rb', line 241

def terminate
  unless @execution_plan_managers.empty?
    logger.error "... cleaning #{@execution_plan_managers.size} execution plans ..."
    begin
      @execution_plan_managers.values.each do |manager|
        manager.terminate
      end
    rescue Errors::PersistenceError
      logger.error "could not to clean the data properly"
    end
    @execution_plan_managers.values.each do |manager|
      finish_manager(manager)
    end
  end
end

#work_failed(work) ⇒ Object

called when there was an unhandled exception during the execution of the work (such as persistence issue) - in this case we just clean up the runtime from the execution plan and let it go (common cause for this is the execution plan being removed from database by external user)



232
233
234
235
236
237
238
239
# File 'lib/dynflow/director.rb', line 232

def work_failed(work)
  if (manager = @execution_plan_managers[work.execution_plan_id])
    manager.terminate
    # Don't try to store when the execution plan went missing
    plan_missing = @world.persistence.find_execution_plans(:filters => { uuid: work.execution_plan_id }).empty?
    finish_manager(manager, store: !plan_missing)
  end
end

#work_finished(work) ⇒ Object



215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/dynflow/director.rb', line 215

def work_finished(work)
  case work
  when PlanningWorkItem
    @planning_plans.delete(work.execution_plan_id)
    @world.persistence.delete_delayed_plans(:execution_plan_uuid => work.execution_plan_id)
    []
  else
    manager = @execution_plan_managers[work.execution_plan_id]
    return [] unless manager # skip case when getting event from execution plan that is not running anymore
    unless_done(manager, manager.what_is_next(work))
  end
end