Module: RemoteStep::SSH

Defined in:
lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb

Constant Summary collapse

DEFAULT_REFRESH_TIME =
2

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#batch_optionsObject

Returns the value of attribute batch_options.



6
7
8
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 6

def batch_options
  @batch_options
end

#override_dependenciesObject

Returns the value of attribute override_dependencies.



6
7
8
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 6

def override_dependencies
  @override_dependencies
end

#produce_dependenciesObject

Returns the value of attribute produce_dependencies.



6
7
8
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 6

def produce_dependencies
  @produce_dependencies
end

#run_typeObject

Returns the value of attribute run_type.



6
7
8
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 6

def run_type
  @run_type
end

Instance Method Details

#_orchestrate_batchObject



73
74
75
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 73

def _orchestrate_batch
  RemoteWorkflow::SSH.orchestrate_batch_job(File.join(base_url, task.to_s), @input_id, @base_name, @batch_options || {})
end

#_runObject



63
64
65
66
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 63

def _run
  RemoteWorkflow::SSH.upload_dependencies(self, @server, 'user', @produce_dependencies)
  RemoteWorkflow::SSH.run_job(File.join(base_url, task.to_s), @input_id, @base_name)
end

#_run_batchObject



68
69
70
71
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 68

def _run_batch
  RemoteWorkflow::SSH.upload_dependencies(self, @server, 'user', @produce_dependencies)
  RemoteWorkflow::SSH.run_batch_job(File.join(base_url, task.to_s), @input_id, @base_name, @batch_options || {})
end

#abortObject



119
120
121
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 119

def abort
  Log.warn "not implemented RemoteWorkflow::SSH.abort(@url, @input_id, @base_name)"
end

#cleanObject



113
114
115
116
117
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 113

def clean
  init_job
  RemoteWorkflow::SSH.clean(@url, @input_id, @base_name)
  _restart
end

#init_job(cache_type = nil, other_params = {}) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 8

def init_job(cache_type = nil, other_params = {})
  return self if @url
  cache_type = :asynchronous if cache_type.nil? and not @is_exec
  cache_type = :exec if cache_type.nil?
  @last_info_time = nil
  @done = false
  @server, @server_path = RemoteWorkflow::SSH.parse_url base_url
  @input_id ||= "inputs-" << rand(100000).to_s

  if override_dependencies && override_dependencies.any?
    if Hash === override_dependencies
      override_dependencies.each do |name,dep|
        inputs[name] = dep
      end
    else
      override_dependencies.each do |od|
        name, _sep, value = od.partition("=")
        inputs[name] = value
      end
    end
  end

  inputs.select{|i| Step === i }.each{|i| i.produce }

  RemoteWorkflow::SSH.upload_inputs(@server, inputs, @input_types, @input_id)

  @remote_path ||= Persist.memory("RemoteStep", :workflow => self.workflow, :task => task, :jobname => @name, :inputs => inputs, :cache_type => cache_type) do
    Misc.insist do
      input_types = {}
      RemoteWorkflow::SSH.post_job(File.join(base_url, task.to_s), @input_id, @base_name)
    end
  end
  @name = @remote_path.split("/").last

  if Open.remote?(@name)
    @url = @name
    @name = File.basename(@name)
  else
    @url = File.join(base_url, task.to_s, @name)
  end

  self
end

#input_dependenciesObject



123
124
125
126
127
128
129
130
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 123

def input_dependencies
  @input_dependencies ||= inputs.values.flatten.
    select{|i| Step === i || (defined?(RemoteStep) && RemoteStep === i) } + 
    inputs.values.flatten.
    select{|dep| Path === dep && Step === dep.resource }.
    #select{|dep| ! dep.resource.started? }. # Ignore input_deps already started
    collect{|dep| dep.resource }
end

#issueObject



77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 77

def issue
  input_types = {}
  init_job
  @remote_path = case @run_type
                 when 'run', :run, nil
                   _run
                 when 'batch', :batch
                   _run_batch
                 when 'orchestrate', :orchestrate
                   _orchestrate_batch
                 end
  @started = true
end

#loadObject



100
101
102
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 100

def load
  load_res Open.open(path)
end

#pathObject



52
53
54
55
56
57
58
59
60
61
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 52

def path
  @server, @server_path = RemoteWorkflow::SSH.parse_url @base_url
  if info[:path]
    "ssh://" + @server + ":" + info[:path]
  elsif @remote_path
    "ssh://" + @server + ":" + @remote_path
  else
    "ssh://" + @server + ":" + ["var/jobs", self.workflow.to_s, task_name.to_s, @name] * "/"
  end
end

#produce(*args) ⇒ Object



91
92
93
94
95
96
97
98
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 91

def produce(*args)
  issue
  while ! (done? || error? || aborted?)
    sleep 1
  end
  raise self.exception if error?
  self
end

#run(stream = nil) ⇒ Object



104
105
106
107
108
109
110
111
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 104

def run(stream = nil)
  if stream
    issue
  else
    produce
    self.load unless stream
  end
end