Class: Dynflow::Director

Inherits:
Object
  • Object
show all
Includes:
Algebrick::TypeCheck
Defined in:
lib/dynflow/director.rb,
lib/dynflow/director/queue_hash.rb,
lib/dynflow/director/flow_manager.rb,
lib/dynflow/director/sequence_cursor.rb,
lib/dynflow/director/sequential_manager.rb,
lib/dynflow/director/running_steps_manager.rb,
lib/dynflow/director/execution_plan_manager.rb

Overview

Director is responsible for telling what to do next when:

* new execution starts
* an event accurs
* some work is finished

It’s public methods (except terminate) return work items that the executor should understand

Defined Under Namespace

Classes: EventWorkItem, ExecutionPlanManager, FinalizeWorkItem, FlowManager, PlanningWorkItem, QueueHash, RunningStepsManager, SequenceCursor, SequentialManager, StepWorkItem, WorkItem

Constant Summary collapse

Event =
Algebrick.type do
  fields! request_id:        String,
          execution_plan_id: String,
          step_id:           Integer,
          event:             Object,
          result:            Concurrent::Promises::ResolvableFuture,
          optional:          Algebrick::Types::Boolean
end
UnprocessableEvent =
Class.new(Dynflow::Error)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(world) ⇒ Director

Returns a new instance of Director.



166
167
168
169
170
171
172
# File 'lib/dynflow/director.rb', line 166

def initialize(world)
  @world = world
  @logger = world.logger
  @execution_plan_managers = {}
  @rescued_steps = {}
  @planning_plans = Set.new
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



164
165
166
# File 'lib/dynflow/director.rb', line 164

def logger
  @logger
end

Instance Method Details

#current_execution_plan_idsObject



174
175
176
# File 'lib/dynflow/director.rb', line 174

def current_execution_plan_ids
  @execution_plan_managers.keys
end

#halt(event) ⇒ Object



249
250
251
# File 'lib/dynflow/director.rb', line 249

def halt(event)
  halt_execution(event.execution_plan_id)
end

#handle_event(event) ⇒ Object



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/dynflow/director.rb', line 191

def handle_event(event)
  Type! event, Event
  execution_plan_manager = @execution_plan_managers[event.execution_plan_id]
  if execution_plan_manager
    execution_plan_manager.event(event)
  elsif event.optional
    event.result.reject "no manager for #{event.inspect}"
    []
  else
    raise Dynflow::Error, "no manager for #{event.inspect}"
  end
rescue Dynflow::Error => e
  event.result.reject e.message
  raise e
end

#handle_planning(execution_plan_uuid) ⇒ Object



178
179
180
181
182
183
# File 'lib/dynflow/director.rb', line 178

def handle_planning(execution_plan_uuid)
  return [] if @planning_plans.include? execution_plan_uuid

  @planning_plans << execution_plan_uuid
  [PlanningWorkItem.new(execution_plan_uuid, :default, @world.id)]
end

#start_execution(execution_plan_id, finished) ⇒ Object



185
186
187
188
189
# File 'lib/dynflow/director.rb', line 185

def start_execution(execution_plan_id, finished)
  manager = track_execution_plan(execution_plan_id, finished)
  return [] unless manager
  unless_done(manager, manager.start)
end

#terminateObject



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/dynflow/director.rb', line 233

def terminate
  unless @execution_plan_managers.empty?
    logger.error "... cleaning #{@execution_plan_managers.size} execution plans ..."
    begin
      @execution_plan_managers.values.each do |manager|
        manager.terminate
      end
    rescue Errors::PersistenceError
      logger.error "could not to clean the data properly"
    end
    @execution_plan_managers.values.each do |manager|
      finish_manager(manager)
    end
  end
end

#work_failed(work) ⇒ Object

called when there was an unhandled exception during the execution of the work (such as persistence issue) - in this case we just clean up the runtime from the execution plan and let it go (common cause for this is the execution plan being removed from database by external user)



224
225
226
227
228
229
230
231
# File 'lib/dynflow/director.rb', line 224

def work_failed(work)
  if (manager = @execution_plan_managers[work.execution_plan_id])
    manager.terminate
    # Don't try to store when the execution plan went missing
    plan_missing = @world.persistence.find_execution_plans(:filters => { uuid: work.execution_plan_id }).empty?
    finish_manager(manager, store: !plan_missing)
  end
end

#work_finished(work) ⇒ Object



207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/dynflow/director.rb', line 207

def work_finished(work)
  case work
  when PlanningWorkItem
    @planning_plans.delete(work.execution_plan_id)
    @world.persistence.delete_delayed_plans(:execution_plan_uuid => work.execution_plan_id)
    []
  else
    manager = @execution_plan_managers[work.execution_plan_id]
    return [] unless manager # skip case when getting event from execution plan that is not running anymore
    unless_done(manager, manager.what_is_next(work))
  end
end