Class: Dynflow::Executors::Parallel

Inherits:
Object
  • Object
show all
Defined in:
lib/dynflow/executors/parallel.rb,
lib/dynflow/executors/parallel/core.rb,
lib/dynflow/executors/parallel/pool.rb,
lib/dynflow/executors/parallel/worker.rb

Defined Under Namespace

Classes: Core, Pool, Worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(world, executor_class:, heartbeat_interval:, queues_options: { :default => { :pool_size => 5 } }) ⇒ Parallel

Returns a new instance of Parallel.



13
14
15
16
17
18
19
20
21
22
# File 'lib/dynflow/executors/parallel.rb', line 13

def initialize(world,
               executor_class:,
               heartbeat_interval:,
               queues_options: { :default => { :pool_size => 5 } })
  @world  = world
  @logger = world.logger
  @core = executor_class.spawn name:        'parallel-executor-core',
                               args:        [world, heartbeat_interval, queues_options],
                               initialized: @core_initialized = Concurrent::Promises.resolvable_future
end

Instance Attribute Details

#coreObject (readonly)

Returns the value of attribute core.



11
12
13
# File 'lib/dynflow/executors/parallel.rb', line 11

def core
  @core
end

Instance Method Details

#delayed_event(director_event) ⇒ Object



46
47
48
49
# File 'lib/dynflow/executors/parallel.rb', line 46

def delayed_event(director_event)
  @core.ask([:handle_event, director_event])
  director_event.result
end

#event(request_id, execution_plan_id, step_id, event, future = nil, optional: false) ⇒ Object



37
38
39
40
# File 'lib/dynflow/executors/parallel.rb', line 37

def event(request_id, execution_plan_id, step_id, event, future = nil, optional: false)
  @core.ask([:handle_event, Director::Event[request_id, execution_plan_id, step_id, event, future, optional]])
  future
end

#execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, wait_for_acceptance = true) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/dynflow/executors/parallel.rb', line 24

def execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, wait_for_acceptance = true)
  accepted = @core.ask([:handle_execution, execution_plan_id, finished])
  accepted.value! if wait_for_acceptance
  finished
rescue Concurrent::Actor::ActorTerminated
  dynflow_error = Dynflow::Error.new('executor terminated')
  finished.reject dynflow_error unless finished.resolved?
  raise dynflow_error
rescue => e
  finished.reject e unless finished.resolved?
  raise e
end

#execution_status(execution_plan_id = nil) ⇒ Object



56
57
58
# File 'lib/dynflow/executors/parallel.rb', line 56

def execution_status(execution_plan_id = nil)
  @core.ask!([:execution_status, execution_plan_id])
end

#halt(execution_plan_id) ⇒ Object



60
61
62
# File 'lib/dynflow/executors/parallel.rb', line 60

def halt(execution_plan_id)
  @core.tell([:halt, execution_plan_id])
end

#initializedObject



64
65
66
# File 'lib/dynflow/executors/parallel.rb', line 64

def initialized
  @core_initialized
end

#plan(execution_plan_id) ⇒ Object



42
43
44
# File 'lib/dynflow/executors/parallel.rb', line 42

def plan(execution_plan_id)
  @core.ask([:handle_planning, execution_plan_id])
end

#terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object



51
52
53
54
# File 'lib/dynflow/executors/parallel.rb', line 51

def terminate(future = Concurrent::Promises.resolvable_future)
  @core.tell([:start_termination, future])
  future
end