Class: Dynflow::Executors::Parallel::Core

Inherits:
Abstract::Core show all
Defined in:
lib/dynflow/executors/parallel/core.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Abstract::Core

#dead_letter_routing, #halt, #handle_event, #handle_execution, #handle_persistence_error, #handle_planning, #heartbeat, #plan_events, #work_finished

Methods inherited from Actor

#behaviour_definition, #terminating?

Methods included from MethodicActor

#on_message

Methods included from Actor::LogWithFullBacktrace

#log

Constructor Details

#initialize(world, heartbeat_interval, queues_options) ⇒ Core

Returns a new instance of Core.



12
13
14
15
16
# File 'lib/dynflow/executors/parallel/core.rb', line 12

def initialize(world, heartbeat_interval, queues_options)
  super
  @pools = {}
  initialize_queues
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



10
11
12
# File 'lib/dynflow/executors/parallel/core.rb', line 10

def logger
  @logger
end

Instance Method Details

#execution_status(execution_plan_id = nil) ⇒ Object



40
41
42
43
44
# File 'lib/dynflow/executors/parallel/core.rb', line 40

def execution_status(execution_plan_id = nil)
  @pools.each_with_object({}) do |(pool_name, pool), hash|
    hash[pool_name] = pool.ask!([:execution_status, execution_plan_id])
  end
end

#feed_pool(work_items) ⇒ Object



46
47
48
49
50
51
# File 'lib/dynflow/executors/parallel/core.rb', line 46

def feed_pool(work_items)
  work_items.each do |new_work|
    new_work.world = @world
    @pools.fetch(suggest_queue(new_work)).tell([:schedule_work, new_work])
  end
end

#finish_termination(pool_name) ⇒ Object



33
34
35
36
37
38
# File 'lib/dynflow/executors/parallel/core.rb', line 33

def finish_termination(pool_name)
  @pools.delete(pool_name)
  # we expect this message from all worker pools
  return unless @pools.empty?
  super()
end

#initialize_queuesObject



18
19
20
21
22
23
24
25
26
# File 'lib/dynflow/executors/parallel/core.rb', line 18

def initialize_queues
  default_pool_size = @queues_options[:default][:pool_size]
  @queues_options.each do |(queue_name, queue_options)|
    queue_pool_size = queue_options.fetch(:pool_size, default_pool_size)
    @pools[queue_name] = Pool.spawn("pool #{queue_name}", @world,
      reference, queue_name, queue_pool_size,
      @world.transaction_adapter)
  end
end

#start_termination(*args) ⇒ Object



28
29
30
31
# File 'lib/dynflow/executors/parallel/core.rb', line 28

def start_termination(*args)
  super
  @pools.values.each { |pool| pool.tell([:start_termination, Concurrent::Promises.resolvable_future]) }
end