Class: Dynflow::ThrottleLimiter::Core
- Inherits:
-
Actor
- Object
- Concurrent::Actor::Context
- Actor
- Dynflow::ThrottleLimiter::Core
show all
- Defined in:
- lib/dynflow/throttle_limiter.rb
Instance Method Summary
collapse
Methods inherited from Actor
#behaviour_definition, #finish_termination, #start_termination, #terminating?
#on_message
#log
Constructor Details
#initialize(world) ⇒ Core
Returns a new instance of Core.
51
52
53
54
|
# File 'lib/dynflow/throttle_limiter.rb', line 51
def initialize(world)
@world = world
@semaphores = {}
end
|
Instance Method Details
#cancel(parent_id, reason = nil) ⇒ Object
98
99
100
101
102
103
104
105
106
107
|
# File 'lib/dynflow/throttle_limiter.rb', line 98
def cancel(parent_id, reason = nil)
if @semaphores.key?(parent_id)
reason ||= 'The task was cancelled.'
@semaphores[parent_id].waiting.each do |triggered|
cancel_plan_id(triggered.execution_plan_id, reason)
triggered.future.reject(reason)
end
finish(parent_id)
end
end
|
#finish(parent_id) ⇒ Object
109
110
111
|
# File 'lib/dynflow/throttle_limiter.rb', line 109
def finish(parent_id)
@semaphores.delete(parent_id)
end
|
#handle_plans(parent_id, planned_ids, failed_ids) ⇒ Object
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
# File 'lib/dynflow/throttle_limiter.rb', line 61
def handle_plans(parent_id, planned_ids, failed_ids)
failed = failed_ids.map do |plan_id|
::Dynflow::World::Triggered[plan_id, Concurrent::Promises.resolvable_future].tap do |triggered|
execute_triggered(triggered)
end
end
planned_ids.map do |child_id|
::Dynflow::World::Triggered[child_id, Concurrent::Promises.resolvable_future].tap do |triggered|
triggered.future.on_resolution! { self << [:release, parent_id] }
execute_triggered(triggered) if @semaphores[parent_id].wait(triggered)
end
end + failed
end
|
#initialize_plan(plan_id, semaphores_hash) ⇒ Object
56
57
58
59
|
# File 'lib/dynflow/throttle_limiter.rb', line 56
def initialize_plan(plan_id, semaphores_hash)
@semaphores[plan_id] = create_semaphores(semaphores_hash)
set_up_clock_for(plan_id, true)
end
|
#observe(parent_id = nil) ⇒ Object
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/dynflow/throttle_limiter.rb', line 76
def observe(parent_id = nil)
if parent_id.nil?
@semaphores.reduce([]) do |acc, cur|
acc << { cur.first => cur.last.waiting }
end
elsif @semaphores.key? parent_id
@semaphores[parent_id].waiting
else
[]
end
end
|
#release(plan_id, key = :level) ⇒ Object
88
89
90
91
92
93
94
95
96
|
# File 'lib/dynflow/throttle_limiter.rb', line 88
def release(plan_id, key = :level)
return unless @semaphores.key? plan_id
set_up_clock_for(plan_id) if key == :time
semaphore = @semaphores[plan_id]
semaphore.release(1, key) if semaphore.children.key?(key)
if semaphore.has_waiting? && semaphore.get == 1
execute_triggered(semaphore.get_waiting)
end
end
|