Class: Dynflow::Executors::Parallel::Pool

Inherits:
Actor
  • Object
show all
Defined in:
lib/dynflow/executors/parallel/pool.rb

Defined Under Namespace

Classes: JobStorage

Instance Method Summary collapse

Methods inherited from Actor

#behaviour_definition, #finish_termination, #terminating?

Methods included from MethodicActor

#on_message

Methods included from Actor::LogWithFullBacktrace

#log

Constructor Details

#initialize(world, core, name, pool_size, transaction_adapter) ⇒ Pool

Returns a new instance of Pool.



35
36
37
38
39
40
41
42
43
44
45
# File 'lib/dynflow/executors/parallel/pool.rb', line 35

def initialize(world, core, name, pool_size, transaction_adapter)
  @world = world
  @name = name
  @executor_core = core
  @pool_size     = pool_size
  @jobs          = JobStorage.new
  @free_workers  = Array.new(pool_size) do |i|
    name = "worker-#{i}"
    Worker.spawn(name, reference, transaction_adapter, telemetry_options.merge(:worker => name))
  end
end

Instance Method Details

#execution_status(execution_plan_id = nil) ⇒ Object



72
73
74
75
76
# File 'lib/dynflow/executors/parallel/pool.rb', line 72

def execution_status(execution_plan_id = nil)
  { :pool_size => @pool_size,
    :free_workers => @free_workers.count,
    :queue_size => @jobs.queue_size(execution_plan_id) }
end

#handle_persistence_error(worker, error, work = nil) ⇒ Object



61
62
63
64
65
# File 'lib/dynflow/executors/parallel/pool.rb', line 61

def handle_persistence_error(worker, error, work = nil)
  @executor_core.tell([:handle_persistence_error, error, work])
  @free_workers << worker
  distribute_jobs
end

#schedule_work(work) ⇒ Object



47
48
49
50
51
# File 'lib/dynflow/executors/parallel/pool.rb', line 47

def schedule_work(work)
  @jobs.add work
  distribute_jobs
  update_telemetry
end

#start_termination(*args) ⇒ Object



67
68
69
70
# File 'lib/dynflow/executors/parallel/pool.rb', line 67

def start_termination(*args)
  super
  try_to_terminate
end

#worker_done(worker, work) ⇒ Object



53
54
55
56
57
58
59
# File 'lib/dynflow/executors/parallel/pool.rb', line 53

def worker_done(worker, work)
  step = work.step if work.is_a?(Director::StepWorkItem)
  @executor_core.tell([:work_finished, work, step && step.delayed_events])
  @free_workers << worker
  Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_active_workers, -1, telemetry_options) }
  distribute_jobs
end