Class: Dynflow::Executors::Parallel::Pool
- Inherits:
-
Actor
- Object
- Concurrent::Actor::Context
- Actor
- Dynflow::Executors::Parallel::Pool
show all
- Defined in:
- lib/dynflow/executors/parallel/pool.rb
Defined Under Namespace
Classes: JobStorage
Instance Method Summary
collapse
Methods inherited from Actor
#behaviour_definition, #finish_termination, #terminating?
#on_message
#log
Constructor Details
#initialize(world, core, name, pool_size, transaction_adapter) ⇒ Pool
Returns a new instance of Pool.
35
36
37
38
39
40
41
42
43
44
45
|
# File 'lib/dynflow/executors/parallel/pool.rb', line 35
def initialize(world, core, name, pool_size, transaction_adapter)
@world = world
@name = name
@executor_core = core
@pool_size = pool_size
@jobs = JobStorage.new
@free_workers = Array.new(pool_size) do |i|
name = "worker-#{i}"
Worker.spawn(name, reference, transaction_adapter, telemetry_options.merge(:worker => name))
end
end
|
Instance Method Details
#execution_status(execution_plan_id = nil) ⇒ Object
72
73
74
75
76
|
# File 'lib/dynflow/executors/parallel/pool.rb', line 72
def execution_status(execution_plan_id = nil)
{ :pool_size => @pool_size,
:free_workers => @free_workers.count,
:queue_size => @jobs.queue_size(execution_plan_id) }
end
|
#handle_persistence_error(worker, error, work = nil) ⇒ Object
61
62
63
64
65
|
# File 'lib/dynflow/executors/parallel/pool.rb', line 61
def handle_persistence_error(worker, error, work = nil)
@executor_core.tell([:handle_persistence_error, error, work])
@free_workers << worker
distribute_jobs
end
|
#schedule_work(work) ⇒ Object
47
48
49
50
51
|
# File 'lib/dynflow/executors/parallel/pool.rb', line 47
def schedule_work(work)
@jobs.add work
distribute_jobs
update_telemetry
end
|
#start_termination(*args) ⇒ Object
67
68
69
70
|
# File 'lib/dynflow/executors/parallel/pool.rb', line 67
def start_termination(*args)
super
try_to_terminate
end
|
#worker_done(worker, work) ⇒ Object
53
54
55
56
57
58
59
|
# File 'lib/dynflow/executors/parallel/pool.rb', line 53
def worker_done(worker, work)
step = work.step if work.is_a?(Director::StepWorkItem)
@executor_core.tell([:work_finished, work, step && step.delayed_events])
@free_workers << worker
Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_active_workers, -1, telemetry_options) }
distribute_jobs
end
|