Class: Kdeploy::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/kdeploy/runner/runner.rb

Overview

Concurrent task runner for executing tasks across multiple hosts

Instance Method Summary collapse

Constructor Details

#initialize(hosts, tasks, parallel: Configuration.default_parallel, output: ConsoleOutput.new, debug: false, base_dir: nil, retries: Configuration.default_retries, retry_delay: Configuration.default_retry_delay, retry_on_nonzero: Configuration.default_retry_on_nonzero, host_timeout: Configuration.default_host_timeout, step_timeout: Configuration.default_step_timeout, retry_policy: Configuration.default_retry_policy, on_step: nil) ⇒ Runner

Returns a new instance of Runner.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/kdeploy/runner/runner.rb', line 8

def initialize(hosts, tasks, parallel: Configuration.default_parallel, output: ConsoleOutput.new,
               debug: false, base_dir: nil, retries: Configuration.default_retries,
               retry_delay: Configuration.default_retry_delay,
               retry_on_nonzero: Configuration.default_retry_on_nonzero,
               host_timeout: Configuration.default_host_timeout,
               step_timeout: Configuration.default_step_timeout,
               retry_policy: Configuration.default_retry_policy,
               on_step: nil)
  @hosts = hosts
  @tasks = tasks
  @parallel = parallel
  @output = output
  @debug = debug
  @base_dir = base_dir
  @retries = retries
  @retry_delay = retry_delay
  @retry_on_nonzero = retry_on_nonzero
  @host_timeout = normalize_timeout(host_timeout)
  @step_timeout = normalize_timeout(step_timeout)
  @retry_policy = retry_policy
  @on_step = on_step
  @pool = Concurrent::FixedThreadPool.new(@parallel)
  @results = Concurrent::Hash.new
end

Instance Method Details

#create_task_futures(task, task_name) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/kdeploy/runner/runner.rb', line 86

def create_task_futures(task, task_name)
  @future_meta = {}
  @hosts.map do |name, config|
    started_at = Concurrent::AtomicReference.new(nil)
    future = Concurrent::Future.execute(executor: @pool) do
      started_at.set(Time.now)
      execute_task_for_host(name, config, task, task_name)
    end
    @future_meta[future] = { host_name: name, started_at: started_at }
    future
  end
end

#execute_concurrent_tasks(task, task_name) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/kdeploy/runner/runner.rb', line 48

def execute_concurrent_tasks(task, task_name)
  futures = create_task_futures(task, task_name)

  # If no hosts, return empty results immediately
  return @results if futures.empty?

  pending = futures.dup

  until pending.empty?
    progressed = false
    now = Time.now

    pending.dup.each do |future|
      meta = @future_meta[future]
      host_name = meta[:host_name]
      started_at = meta[:started_at].get

      if future.fulfilled? || future.rejected?
        collect_future_result(future, host_name)
        pending.delete(future)
        progressed = true
      elsif timeout_exceeded?(started_at, now)
        @results[host_name] ||= {
          status: :failed,
          error: "execution timeout after #{@host_timeout}s",
          output: []
        }
        pending.delete(future)
        progressed = true
      end
    end

    sleep(0.05) unless progressed
  end

  @results
end

#find_task(task_name) ⇒ Object

Raises:



40
41
42
43
44
45
46
# File 'lib/kdeploy/runner/runner.rb', line 40

def find_task(task_name)
  task = @tasks[task_name]

  raise TaskNotFoundError, task_name unless task

  task
end

#run(task_name) ⇒ Object



33
34
35
36
37
38
# File 'lib/kdeploy/runner/runner.rb', line 33

def run(task_name)
  task = find_task(task_name)
  execute_concurrent_tasks(task, task_name)
ensure
  @pool.shutdown
end