Class: Dynflow::ThrottleLimiter::Core
  
  
  
  
  
    - Inherits:
 
    - 
      Actor
      
        
          - Object
 
          
            - Concurrent::Actor::Context
 
          
            - Actor
 
          
            - Dynflow::ThrottleLimiter::Core
 
          
        
        show all
      
     
  
  
  
  
  
  
  
  
  
  
    - Defined in:
 
    - lib/dynflow/throttle_limiter.rb
 
  
  
 
  
    
      Instance Method Summary
      collapse
    
    
  
  
  
  
  
  
  
  
  
  Methods inherited from Actor
  #behaviour_definition, #finish_termination, #start_termination, #terminating?
  
  
  
  
  
  
  
  
  
  
  #on_message
  
  
  
  
  
  
  
  
  
  #log
  
  Constructor Details
  
    
  
  
    #initialize(world)  ⇒ Core 
  
  
  
  
    
Returns a new instance of Core.
   
 
  
  
    
      
51
52
53
54 
     | 
    
      # File 'lib/dynflow/throttle_limiter.rb', line 51
def initialize(world)
  @world = world
  @semaphores = {}
end
     | 
  
 
  
 
  
    Instance Method Details
    
      
  
  
    #cancel(parent_id, reason = nil)  ⇒ Object 
  
  
  
  
    
      
98
99
100
101
102
103
104
105
106
107 
     | 
    
      # File 'lib/dynflow/throttle_limiter.rb', line 98
def cancel(parent_id, reason = nil)
  if @semaphores.key?(parent_id)
    reason ||= 'The task was cancelled.'
    @semaphores[parent_id].waiting.each do |triggered|
      cancel_plan_id(triggered.execution_plan_id, reason)
      triggered.future.reject(reason)
    end
    finish(parent_id)
  end
end
     | 
  
 
    
      
  
  
    #finish(parent_id)  ⇒ Object 
  
  
  
  
    
      
109
110
111 
     | 
    
      # File 'lib/dynflow/throttle_limiter.rb', line 109
def finish(parent_id)
  @semaphores.delete(parent_id)
end 
     | 
  
 
    
      
  
  
    #handle_plans(parent_id, planned_ids, failed_ids)  ⇒ Object 
  
  
  
  
    
      
61
62
63
64
65
66
67
68
69
70
71
72
73
74 
     | 
    
      # File 'lib/dynflow/throttle_limiter.rb', line 61
def handle_plans(parent_id, planned_ids, failed_ids)
  failed = failed_ids.map do |plan_id|
    ::Dynflow::World::Triggered[plan_id, Concurrent::Promises.resolvable_future].tap do |triggered|
      execute_triggered(triggered)
    end
  end
  planned_ids.map do |child_id|
    ::Dynflow::World::Triggered[child_id, Concurrent::Promises.resolvable_future].tap do |triggered|
      triggered.future.on_resolution! { self << [:release, parent_id] }
      execute_triggered(triggered) if @semaphores[parent_id].wait(triggered)
    end
  end + failed
end
     | 
  
 
    
      
  
  
    #initialize_plan(plan_id, semaphores_hash)  ⇒ Object 
  
  
  
  
    
      
56
57
58
59 
     | 
    
      # File 'lib/dynflow/throttle_limiter.rb', line 56
def initialize_plan(plan_id, semaphores_hash)
  @semaphores[plan_id] = create_semaphores(semaphores_hash)
  set_up_clock_for(plan_id, true)
end 
     | 
  
 
    
      
  
  
    #observe(parent_id = nil)  ⇒ Object 
  
  
  
  
    
      
76
77
78
79
80
81
82
83
84
85
86 
     | 
    
      # File 'lib/dynflow/throttle_limiter.rb', line 76
def observe(parent_id = nil)
  if parent_id.nil?
    @semaphores.reduce([]) do |acc, cur|
      acc << { cur.first => cur.last.waiting }
    end
  elsif @semaphores.key? parent_id
    @semaphores[parent_id].waiting
  else
    []
  end
end
     | 
  
 
    
      
  
  
    #release(plan_id, key = :level)  ⇒ Object 
  
  
  
  
    
      
88
89
90
91
92
93
94
95
96 
     | 
    
      # File 'lib/dynflow/throttle_limiter.rb', line 88
def release(plan_id, key = :level)
  return unless @semaphores.key? plan_id
  set_up_clock_for(plan_id) if key == :time
  semaphore = @semaphores[plan_id]
  semaphore.release(1, key) if semaphore.children.key?(key)
  if semaphore.has_waiting? && semaphore.get == 1
    execute_triggered(semaphore.get_waiting)
  end
end
     |