Module: RemoteStep::SSH
- Defined in:
- lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb
Constant Summary collapse
- DEFAULT_REFRESH_TIME =
2
Instance Attribute Summary collapse
-
#batch_options ⇒ Object
Returns the value of attribute batch_options.
-
#override_dependencies ⇒ Object
Returns the value of attribute override_dependencies.
-
#produce_dependencies ⇒ Object
Returns the value of attribute produce_dependencies.
-
#run_type ⇒ Object
Returns the value of attribute run_type.
Instance Method Summary collapse
- #_orchestrate_batch ⇒ Object
- #_run ⇒ Object
- #_run_batch ⇒ Object
- #abort ⇒ Object
- #clean ⇒ Object
- #init_job(cache_type = nil, other_params = {}) ⇒ Object
- #input_dependencies ⇒ Object
- #issue ⇒ Object
- #load ⇒ Object
- #path ⇒ Object
- #produce(*args) ⇒ Object
- #run(stream = nil) ⇒ Object
Instance Attribute Details
#batch_options ⇒ Object
Returns the value of attribute batch_options.
6 7 8 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 6 def @batch_options end |
#override_dependencies ⇒ Object
Returns the value of attribute override_dependencies.
6 7 8 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 6 def override_dependencies @override_dependencies end |
#produce_dependencies ⇒ Object
Returns the value of attribute produce_dependencies.
6 7 8 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 6 def produce_dependencies @produce_dependencies end |
#run_type ⇒ Object
Returns the value of attribute run_type.
6 7 8 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 6 def run_type @run_type end |
Instance Method Details
#_orchestrate_batch ⇒ Object
73 74 75 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 73 def _orchestrate_batch RemoteWorkflow::SSH.orchestrate_batch_job(File.join(base_url, task.to_s), @input_id, @base_name, @batch_options || {}) end |
#_run ⇒ Object
63 64 65 66 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 63 def _run RemoteWorkflow::SSH.upload_dependencies(self, @server, 'user', @produce_dependencies) RemoteWorkflow::SSH.run_job(File.join(base_url, task.to_s), @input_id, @base_name) end |
#_run_batch ⇒ Object
68 69 70 71 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 68 def _run_batch RemoteWorkflow::SSH.upload_dependencies(self, @server, 'user', @produce_dependencies) RemoteWorkflow::SSH.run_batch_job(File.join(base_url, task.to_s), @input_id, @base_name, @batch_options || {}) end |
#abort ⇒ Object
119 120 121 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 119 def abort Log.warn "not implemented RemoteWorkflow::SSH.abort(@url, @input_id, @base_name)" end |
#clean ⇒ Object
113 114 115 116 117 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 113 def clean init_job RemoteWorkflow::SSH.clean(@url, @input_id, @base_name) _restart end |
#init_job(cache_type = nil, other_params = {}) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 8 def init_job(cache_type = nil, other_params = {}) return self if @url cache_type = :asynchronous if cache_type.nil? and not @is_exec cache_type = :exec if cache_type.nil? @last_info_time = nil @done = false @server, @server_path = RemoteWorkflow::SSH.parse_url base_url @input_id ||= "inputs-" << rand(100000).to_s if override_dependencies && override_dependencies.any? if Hash === override_dependencies override_dependencies.each do |name,dep| inputs[name] = dep end else override_dependencies.each do |od| name, _sep, value = od.partition("=") inputs[name] = value end end end inputs.select{|i| Step === i }.each{|i| i.produce } RemoteWorkflow::SSH.upload_inputs(@server, inputs, @input_types, @input_id) @remote_path ||= Persist.memory("RemoteStep", :workflow => self.workflow, :task => task, :jobname => @name, :inputs => inputs, :cache_type => cache_type) do Misc.insist do input_types = {} RemoteWorkflow::SSH.post_job(File.join(base_url, task.to_s), @input_id, @base_name) end end @name = @remote_path.split("/").last if Open.remote?(@name) @url = @name @name = File.basename(@name) else @url = File.join(base_url, task.to_s, @name) end self end |
#input_dependencies ⇒ Object
123 124 125 126 127 128 129 130 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 123 def input_dependencies @input_dependencies ||= inputs.values.flatten. select{|i| Step === i || (defined?(RemoteStep) && RemoteStep === i) } + inputs.values.flatten. select{|dep| Path === dep && Step === dep.resource }. #select{|dep| ! dep.resource.started? }. # Ignore input_deps already started collect{|dep| dep.resource } end |
#issue ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 77 def issue input_types = {} init_job @remote_path = case @run_type when 'run', :run, nil _run when 'batch', :batch _run_batch when 'orchestrate', :orchestrate _orchestrate_batch end @started = true end |
#load ⇒ Object
100 101 102 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 100 def load load_res Open.open(path) end |
#path ⇒ Object
52 53 54 55 56 57 58 59 60 61 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 52 def path @server, @server_path = RemoteWorkflow::SSH.parse_url @base_url if info[:path] "ssh://" + @server + ":" + info[:path] elsif @remote_path "ssh://" + @server + ":" + @remote_path else "ssh://" + @server + ":" + ["var/jobs", self.workflow.to_s, task_name.to_s, @name] * "/" end end |
#produce(*args) ⇒ Object
91 92 93 94 95 96 97 98 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 91 def produce(*args) issue while ! (done? || error? || aborted?) sleep 1 end raise self.exception if error? self end |
#run(stream = nil) ⇒ Object
104 105 106 107 108 109 110 111 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 104 def run(stream = nil) if stream issue else produce self.load unless stream end end |