Class: Dynflow::Executors::Parallel::Core
Instance Attribute Summary collapse
Instance Method Summary
collapse
#dead_letter_routing, #halt, #handle_event, #handle_execution, #handle_persistence_error, #handle_planning, #heartbeat, #plan_events, #work_finished
Methods inherited from Actor
#behaviour_definition, #terminating?
#on_message
#log
Constructor Details
#initialize(world, heartbeat_interval, queues_options) ⇒ Core
Returns a new instance of Core.
12
13
14
15
16
|
# File 'lib/dynflow/executors/parallel/core.rb', line 12
def initialize(world, heartbeat_interval, queues_options)
super
@pools = {}
initialize_queues
end
|
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
10
11
12
|
# File 'lib/dynflow/executors/parallel/core.rb', line 10
def logger
@logger
end
|
Instance Method Details
#execution_status(execution_plan_id = nil) ⇒ Object
40
41
42
43
44
|
# File 'lib/dynflow/executors/parallel/core.rb', line 40
def execution_status(execution_plan_id = nil)
@pools.each_with_object({}) do |(pool_name, pool), hash|
hash[pool_name] = pool.ask!([:execution_status, execution_plan_id])
end
end
|
#feed_pool(work_items) ⇒ Object
46
47
48
49
50
51
|
# File 'lib/dynflow/executors/parallel/core.rb', line 46
def feed_pool(work_items)
work_items.each do |new_work|
new_work.world = @world
@pools.fetch(suggest_queue(new_work)).tell([:schedule_work, new_work])
end
end
|
#finish_termination(pool_name) ⇒ Object
33
34
35
36
37
38
|
# File 'lib/dynflow/executors/parallel/core.rb', line 33
def finish_termination(pool_name)
@pools.delete(pool_name)
return unless @pools.empty?
super()
end
|
#initialize_queues ⇒ Object
18
19
20
21
22
23
24
25
26
|
# File 'lib/dynflow/executors/parallel/core.rb', line 18
def initialize_queues
default_pool_size = @queues_options[:default][:pool_size]
@queues_options.each do |(queue_name, queue_options)|
queue_pool_size = queue_options.fetch(:pool_size, default_pool_size)
@pools[queue_name] = Pool.spawn("pool #{queue_name}", @world,
reference, queue_name, queue_pool_size,
@world.transaction_adapter)
end
end
|
#start_termination(*args) ⇒ Object
28
29
30
31
|
# File 'lib/dynflow/executors/parallel/core.rb', line 28
def start_termination(*args)
super
@pools.values.each { |pool| pool.tell([:start_termination, Concurrent::Promises.resolvable_future]) }
end
|