Class: Dynflow::Executors::Parallel::Pool
  
  
  
  
  
    - Inherits:
 
    - 
      Actor
      
        
          - Object
 
          
            - Concurrent::Actor::Context
 
          
            - Actor
 
          
            - Dynflow::Executors::Parallel::Pool
 
          
        
        show all
      
     
  
  
  
  
  
  
  
  
  
  
    - Defined in:
 
    - lib/dynflow/executors/parallel/pool.rb
 
  
  
 
Defined Under Namespace
  
    
  
    
      Classes: JobStorage
    
  
  
    
      Instance Method Summary
      collapse
    
    
  
  
  
  
  
  
  
  
  
  Methods inherited from Actor
  #behaviour_definition, #finish_termination, #terminating?
  
  
  
  
  
  
  
  
  
  
  #on_message
  
  
  
  
  
  
  
  
  
  #log
  
  Constructor Details
  
    
  
  
    #initialize(world, core, name, pool_size, transaction_adapter)  ⇒ Pool 
  
  
  
  
    
Returns a new instance of Pool.
   
 
  
  
    
      
35
36
37
38
39
40
41
42
43
44
45 
     | 
    
      # File 'lib/dynflow/executors/parallel/pool.rb', line 35
def initialize(world, core, name, pool_size, transaction_adapter)
  @world = world
  @name = name
  @executor_core = core
  @pool_size     = pool_size
  @jobs          = JobStorage.new
  @free_workers  = Array.new(pool_size) do |i|
    name = "worker-#{i}"
    Worker.spawn(name, reference, transaction_adapter, telemetry_options.merge(:worker => name))
  end
end
     | 
  
 
  
 
  
    Instance Method Details
    
      
  
  
    #execution_status(execution_plan_id = nil)  ⇒ Object 
  
  
  
  
    
      
72
73
74
75
76 
     | 
    
      # File 'lib/dynflow/executors/parallel/pool.rb', line 72
def execution_status(execution_plan_id = nil)
  { :pool_size => @pool_size,
    :free_workers => @free_workers.count,
    :queue_size => @jobs.queue_size(execution_plan_id) }
end
     | 
  
 
    
      
  
  
    #handle_persistence_error(worker, error, work = nil)  ⇒ Object 
  
  
  
  
    
      
61
62
63
64
65 
     | 
    
      # File 'lib/dynflow/executors/parallel/pool.rb', line 61
def handle_persistence_error(worker, error, work = nil)
  @executor_core.tell([:handle_persistence_error, error, work])
  @free_workers << worker
  distribute_jobs
end 
     | 
  
 
    
      
  
  
    #schedule_work(work)  ⇒ Object 
  
  
  
  
    
      
47
48
49
50
51 
     | 
    
      # File 'lib/dynflow/executors/parallel/pool.rb', line 47
def schedule_work(work)
  @jobs.add work
  distribute_jobs
  update_telemetry
end 
     | 
  
 
    
      
  
  
    #start_termination(*args)  ⇒ Object 
  
  
  
  
    
      
67
68
69
70 
     | 
    
      # File 'lib/dynflow/executors/parallel/pool.rb', line 67
def start_termination(*args)
  super
  try_to_terminate
end 
     | 
  
 
    
      
  
  
    #worker_done(worker, work)  ⇒ Object 
  
  
  
  
    
      
53
54
55
56
57
58
59 
     | 
    
      # File 'lib/dynflow/executors/parallel/pool.rb', line 53
def worker_done(worker, work)
  step = work.step if work.is_a?(Director::StepWorkItem)
  @executor_core.tell([:work_finished, work, step && step.delayed_events])
  @free_workers << worker
  Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_active_workers, -1, telemetry_options) }
  distribute_jobs
end
     |