Class: Dynflow::Executors::Parallel
- Inherits:
-
Object
- Object
- Dynflow::Executors::Parallel
show all
- Defined in:
- lib/dynflow/executors/parallel.rb,
lib/dynflow/executors/parallel/core.rb,
lib/dynflow/executors/parallel/pool.rb,
lib/dynflow/executors/parallel/worker.rb
Defined Under Namespace
Classes: Core, Pool, Worker
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#delayed_event(director_event) ⇒ Object
-
#event(request_id, execution_plan_id, step_id, event, future = nil, optional: false) ⇒ Object
-
#execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, wait_for_acceptance = true) ⇒ Object
-
#execution_status(execution_plan_id = nil) ⇒ Object
-
#halt(execution_plan_id) ⇒ Object
-
#initialize(world, executor_class:, heartbeat_interval:, queues_options: { :default => { :pool_size => 5 } }) ⇒ Parallel
constructor
A new instance of Parallel.
-
#initialized ⇒ Object
-
#plan(execution_plan_id) ⇒ Object
-
#terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object
Constructor Details
#initialize(world, executor_class:, heartbeat_interval:, queues_options: { :default => { :pool_size => 5 } }) ⇒ Parallel
Returns a new instance of Parallel.
13
14
15
16
17
18
19
20
21
22
|
# File 'lib/dynflow/executors/parallel.rb', line 13
def initialize(world,
executor_class:,
heartbeat_interval:,
queues_options: { :default => { :pool_size => 5 } })
@world = world
@logger = world.logger
@core = executor_class.spawn name: 'parallel-executor-core',
args: [world, heartbeat_interval, queues_options],
initialized: @core_initialized = Concurrent::Promises.resolvable_future
end
|
Instance Attribute Details
#core ⇒ Object
Returns the value of attribute core.
11
12
13
|
# File 'lib/dynflow/executors/parallel.rb', line 11
def core
@core
end
|
Instance Method Details
#delayed_event(director_event) ⇒ Object
46
47
48
49
|
# File 'lib/dynflow/executors/parallel.rb', line 46
def delayed_event(director_event)
@core.ask([:handle_event, director_event])
director_event.result
end
|
#event(request_id, execution_plan_id, step_id, event, future = nil, optional: false) ⇒ Object
37
38
39
40
|
# File 'lib/dynflow/executors/parallel.rb', line 37
def event(request_id, execution_plan_id, step_id, event, future = nil, optional: false)
@core.ask([:handle_event, Director::Event[request_id, execution_plan_id, step_id, event, future, optional]])
future
end
|
#execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, wait_for_acceptance = true) ⇒ Object
24
25
26
27
28
29
30
31
32
33
34
35
|
# File 'lib/dynflow/executors/parallel.rb', line 24
def execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, wait_for_acceptance = true)
accepted = @core.ask([:handle_execution, execution_plan_id, finished])
accepted.value! if wait_for_acceptance
finished
rescue Concurrent::Actor::ActorTerminated
dynflow_error = Dynflow::Error.new('executor terminated')
finished.reject dynflow_error unless finished.resolved?
raise dynflow_error
rescue => e
finished.reject e unless finished.resolved?
raise e
end
|
#execution_status(execution_plan_id = nil) ⇒ Object
56
57
58
|
# File 'lib/dynflow/executors/parallel.rb', line 56
def execution_status(execution_plan_id = nil)
@core.ask!([:execution_status, execution_plan_id])
end
|
#halt(execution_plan_id) ⇒ Object
60
61
62
|
# File 'lib/dynflow/executors/parallel.rb', line 60
def halt(execution_plan_id)
@core.tell([:halt, execution_plan_id])
end
|
#initialized ⇒ Object
64
65
66
|
# File 'lib/dynflow/executors/parallel.rb', line 64
def initialized
@core_initialized
end
|
#plan(execution_plan_id) ⇒ Object
42
43
44
|
# File 'lib/dynflow/executors/parallel.rb', line 42
def plan(execution_plan_id)
@core.ask([:handle_planning, execution_plan_id])
end
|
#terminate(future = Concurrent::Promises.resolvable_future) ⇒ Object
51
52
53
54
|
# File 'lib/dynflow/executors/parallel.rb', line 51
def terminate(future = Concurrent::Promises.resolvable_future)
@core.tell([:start_termination, future])
future
end
|