Class: Dynflow::Executors::Parallel::Core
  
  
  
  Instance Attribute Summary collapse
  
  
    
      Instance Method Summary
      collapse
    
    
  
  
  
  
  
  
  
  
  
  
  #dead_letter_routing, #halt, #handle_event, #handle_execution, #handle_persistence_error, #handle_planning, #heartbeat, #plan_events, #work_finished
  
  
  
  
  
  
  
  
  Methods inherited from Actor
  #behaviour_definition, #terminating?
  
  
  
  
  
  
  
  
  
  
  #on_message
  
  
  
  
  
  
  
  
  
  #log
  
  Constructor Details
  
    
  
  
    #initialize(world, heartbeat_interval, queues_options)  ⇒ Core 
  
  
  
  
    
Returns a new instance of Core.
   
 
  
  
    
      
12
13
14
15
16 
     | 
    
      # File 'lib/dynflow/executors/parallel/core.rb', line 12
def initialize(world, heartbeat_interval, queues_options)
  super
  @pools = {}
  initialize_queues
end
     | 
  
 
  
 
  
    Instance Attribute Details
    
      
      
      
  
  
    #logger  ⇒ Object  
  
  
  
  
    
Returns the value of attribute logger.
   
 
  
  
    
      
10
11
12 
     | 
    
      # File 'lib/dynflow/executors/parallel/core.rb', line 10
def logger
  @logger
end 
     | 
  
 
    
   
  
    Instance Method Details
    
      
  
  
    #execution_status(execution_plan_id = nil)  ⇒ Object 
  
  
  
  
    
      
40
41
42
43
44 
     | 
    
      # File 'lib/dynflow/executors/parallel/core.rb', line 40
def execution_status(execution_plan_id = nil)
  @pools.each_with_object({}) do |(pool_name, pool), hash|
    hash[pool_name] = pool.ask!([:execution_status, execution_plan_id])
  end
end
     | 
  
 
    
      
  
  
    #feed_pool(work_items)  ⇒ Object 
  
  
  
  
    
      
46
47
48
49
50
51 
     | 
    
      # File 'lib/dynflow/executors/parallel/core.rb', line 46
def feed_pool(work_items)
  work_items.each do |new_work|
    new_work.world = @world
    @pools.fetch(suggest_queue(new_work)).tell([:schedule_work, new_work])
  end
end
     | 
  
 
    
      
  
  
    #finish_termination(pool_name)  ⇒ Object 
  
  
  
  
    
      
33
34
35
36
37
38 
     | 
    
      # File 'lib/dynflow/executors/parallel/core.rb', line 33
def finish_termination(pool_name)
  @pools.delete(pool_name)
    return unless @pools.empty?
  super()
end
     | 
  
 
    
      
  
  
    #initialize_queues  ⇒ Object 
  
  
  
  
    
      
18
19
20
21
22
23
24
25
26 
     | 
    
      # File 'lib/dynflow/executors/parallel/core.rb', line 18
def initialize_queues
  default_pool_size = @queues_options[:default][:pool_size]
  @queues_options.each do |(queue_name, queue_options)|
    queue_pool_size = queue_options.fetch(:pool_size, default_pool_size)
    @pools[queue_name] = Pool.spawn("pool #{queue_name}", @world,
      reference, queue_name, queue_pool_size,
      @world.transaction_adapter)
  end
end
     | 
  
 
    
      
  
  
    #start_termination(*args)  ⇒ Object 
  
  
  
  
    
      
28
29
30
31 
     | 
    
      # File 'lib/dynflow/executors/parallel/core.rb', line 28
def start_termination(*args)
  super
  @pools.values.each { |pool| pool.tell([:start_termination, Concurrent::Promises.resolvable_future]) }
end
     |