Class: Dynflow::Executors::Parallel
  
  
  
  
  
    - Inherits:
 
    - 
      Object
      
        
          - Object
 
          
            - Dynflow::Executors::Parallel
 
          
        
        show all
      
     
  
  
  
  
  
  
  
  
  
  
    - Defined in:
 
    - lib/dynflow/executors/parallel.rb,
  lib/dynflow/executors/parallel/core.rb,
 lib/dynflow/executors/parallel/pool.rb,
 lib/dynflow/executors/parallel/worker.rb
 
  
  
 
Defined Under Namespace
  
    
  
    
      Classes: Core, Pool, Worker
    
  
  Instance Attribute Summary collapse
  
  
    
      Instance Method Summary
      collapse
    
    
      
        - 
  
    
      #delayed_event(director_event)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
  
 
      
        - 
  
    
      #event(request_id, execution_plan_id, step_id, event, future = nil, optional: false)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
  
 
      
        - 
  
    
      #execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, wait_for_acceptance = true)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
  
 
      
        - 
  
    
      #execution_status(execution_plan_id = nil)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
  
 
      
        - 
  
    
      #halt(execution_plan_id)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
  
 
      
        - 
  
    
      #initialize(world, executor_class:, heartbeat_interval:, queues_options: { :default => { :pool_size => 5 } })  ⇒ Parallel 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    
A new instance of Parallel.
 
  
 
      
        - 
  
    
      #initialized  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
  
 
      
        - 
  
    
      #plan(execution_plan_id)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
  
 
      
        - 
  
    
      #terminate(future = Concurrent::Promises.resolvable_future)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
  
 
      
    
  
  Constructor Details
  
    
  
  
    #initialize(world, executor_class:, heartbeat_interval:, queues_options: { :default => { :pool_size => 5 } })  ⇒ Parallel 
  
  
  
  
    
Returns a new instance of Parallel.
   
 
  
  
    
      
13
14
15
16
17
18
19
20
21
22 
     | 
    
      # File 'lib/dynflow/executors/parallel.rb', line 13
def initialize(world,
               executor_class:,
               heartbeat_interval:,
               queues_options: { :default => { :pool_size => 5 } })
  @world  = world
  @logger = world.logger
  @core = executor_class.spawn name:        'parallel-executor-core',
                               args:        [world, heartbeat_interval, queues_options],
                               initialized: @core_initialized = Concurrent::Promises.resolvable_future
end
     | 
  
 
  
 
  
    Instance Attribute Details
    
      
      
      
  
  
    #core  ⇒ Object  
  
  
  
  
    
Returns the value of attribute core.
   
 
  
  
    
      
11
12
13 
     | 
    
      # File 'lib/dynflow/executors/parallel.rb', line 11
def core
  @core
end 
     | 
  
 
    
   
  
    Instance Method Details
    
      
  
  
    #delayed_event(director_event)  ⇒ Object 
  
  
  
  
    
      
46
47
48
49 
     | 
    
      # File 'lib/dynflow/executors/parallel.rb', line 46
def delayed_event(director_event)
  @core.ask([:handle_event, director_event])
  director_event.result
end 
     | 
  
 
    
      
  
  
    #event(request_id, execution_plan_id, step_id, event, future = nil, optional: false)  ⇒ Object 
  
  
  
  
    
      
37
38
39
40 
     | 
    
      # File 'lib/dynflow/executors/parallel.rb', line 37
def event(request_id, execution_plan_id, step_id, event, future = nil, optional: false)
  @core.ask([:handle_event, Director::Event[request_id, execution_plan_id, step_id, event, future, optional]])
  future
end 
     | 
  
 
    
      
  
  
    #execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, wait_for_acceptance = true)  ⇒ Object 
  
  
  
  
    
      
24
25
26
27
28
29
30
31
32
33
34
35 
     | 
    
      # File 'lib/dynflow/executors/parallel.rb', line 24
def execute(execution_plan_id, finished = Concurrent::Promises.resolvable_future, wait_for_acceptance = true)
  accepted = @core.ask([:handle_execution, execution_plan_id, finished])
  accepted.value! if wait_for_acceptance
  finished
rescue Concurrent::Actor::ActorTerminated
  dynflow_error = Dynflow::Error.new('executor terminated')
  finished.reject dynflow_error unless finished.resolved?
  raise dynflow_error
rescue => e
  finished.reject e unless finished.resolved?
  raise e
end
     | 
  
 
    
      
  
  
    #execution_status(execution_plan_id = nil)  ⇒ Object 
  
  
  
  
    
      
56
57
58 
     | 
    
      # File 'lib/dynflow/executors/parallel.rb', line 56
def execution_status(execution_plan_id = nil)
  @core.ask!([:execution_status, execution_plan_id])
end 
     | 
  
 
    
      
  
  
    #halt(execution_plan_id)  ⇒ Object 
  
  
  
  
    
      
60
61
62 
     | 
    
      # File 'lib/dynflow/executors/parallel.rb', line 60
def halt(execution_plan_id)
  @core.tell([:halt, execution_plan_id])
end 
     | 
  
 
    
      
  
  
    #initialized  ⇒ Object 
  
  
  
  
    
      
64
65
66 
     | 
    
      # File 'lib/dynflow/executors/parallel.rb', line 64
def initialized
  @core_initialized
end 
     | 
  
 
    
      
  
  
    #plan(execution_plan_id)  ⇒ Object 
  
  
  
  
    
      
42
43
44 
     | 
    
      # File 'lib/dynflow/executors/parallel.rb', line 42
def plan(execution_plan_id)
  @core.ask([:handle_planning, execution_plan_id])
end 
     | 
  
 
    
      
  
  
    #terminate(future = Concurrent::Promises.resolvable_future)  ⇒ Object 
  
  
  
  
    
      
51
52
53
54 
     | 
    
      # File 'lib/dynflow/executors/parallel.rb', line 51
def terminate(future = Concurrent::Promises.resolvable_future)
  @core.tell([:start_termination, future])
  future
end 
     |