Class: Dynflow::ThrottleLimiter::Core

Inherits:
Actor
  • Object
show all
Defined in:
lib/dynflow/throttle_limiter.rb

Instance Method Summary collapse

Methods inherited from Actor

#behaviour_definition, #finish_termination, #start_termination, #terminating?

Methods included from MethodicActor

#on_message

Methods included from Actor::LogWithFullBacktrace

#log

Constructor Details

#initialize(world) ⇒ Core

Returns a new instance of Core.



51
52
53
54
# File 'lib/dynflow/throttle_limiter.rb', line 51

def initialize(world)
  @world = world
  @semaphores = {}
end

Instance Method Details

#cancel(parent_id, reason = nil) ⇒ Object



98
99
100
101
102
103
104
105
106
107
# File 'lib/dynflow/throttle_limiter.rb', line 98

def cancel(parent_id, reason = nil)
  if @semaphores.key?(parent_id)
    reason ||= 'The task was cancelled.'
    @semaphores[parent_id].waiting.each do |triggered|
      cancel_plan_id(triggered.execution_plan_id, reason)
      triggered.future.reject(reason)
    end
    finish(parent_id)
  end
end

#finish(parent_id) ⇒ Object



109
110
111
# File 'lib/dynflow/throttle_limiter.rb', line 109

def finish(parent_id)
  @semaphores.delete(parent_id)
end

#handle_plans(parent_id, planned_ids, failed_ids) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/dynflow/throttle_limiter.rb', line 61

def handle_plans(parent_id, planned_ids, failed_ids)
  failed = failed_ids.map do |plan_id|
    ::Dynflow::World::Triggered[plan_id, Concurrent::Promises.resolvable_future].tap do |triggered|
      execute_triggered(triggered)
    end
  end

  planned_ids.map do |child_id|
    ::Dynflow::World::Triggered[child_id, Concurrent::Promises.resolvable_future].tap do |triggered|
      triggered.future.on_resolution! { self << [:release, parent_id] }
      execute_triggered(triggered) if @semaphores[parent_id].wait(triggered)
    end
  end + failed
end

#initialize_plan(plan_id, semaphores_hash) ⇒ Object



56
57
58
59
# File 'lib/dynflow/throttle_limiter.rb', line 56

def initialize_plan(plan_id, semaphores_hash)
  @semaphores[plan_id] = create_semaphores(semaphores_hash)
  set_up_clock_for(plan_id, true)
end

#observe(parent_id = nil) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
# File 'lib/dynflow/throttle_limiter.rb', line 76

def observe(parent_id = nil)
  if parent_id.nil?
    @semaphores.reduce([]) do |acc, cur|
      acc << { cur.first => cur.last.waiting }
    end
  elsif @semaphores.key? parent_id
    @semaphores[parent_id].waiting
  else
    []
  end
end

#release(plan_id, key = :level) ⇒ Object



88
89
90
91
92
93
94
95
96
# File 'lib/dynflow/throttle_limiter.rb', line 88

def release(plan_id, key = :level)
  return unless @semaphores.key? plan_id
  set_up_clock_for(plan_id) if key == :time
  semaphore = @semaphores[plan_id]
  semaphore.release(1, key) if semaphore.children.key?(key)
  if semaphore.has_waiting? && semaphore.get == 1
    execute_triggered(semaphore.get_waiting)
  end
end