Class: OodCore::Job::Adapters::PBSPro
- Inherits:
-
OodCore::Job::Adapter
- Object
- OodCore::Job::Adapter
- OodCore::Job::Adapters::PBSPro
- Defined in:
- lib/ood_core/job/adapters/pbspro.rb
Overview
An adapter object that describes the communication with a PBS Pro resource manager for job management.
Defined Under Namespace
Classes: Batch
Constant Summary collapse
- STATE_MAP =
Mapping of state codes for PBSPro
{ 'Q' => :queued, 'W' => :queued_held, # job is waiting for its submitter-assigned start time to be reached 'H' => :queued_held, 'T' => :queued_held, # job is being moved to a new location 'M' => :completed, # job was moved to another server 'R' => :running, 'S' => :suspended, 'U' => :suspended, # cycle-harvesting job is suspended due to keyboard activity 'E' => :running, # job is exiting after having run 'F' => :completed, # job is finished 'X' => :completed, # subjob has completed execution or has been deleted 'B' => :running # job array has at least one child running }
Instance Attribute Summary collapse
-
#qstat_factor ⇒ Float
readonly
What percentage of jobs a user owns out of all jobs, used to decide whether we filter the owner’s jobs from a ‘qstat` of all jobs or call `qstat` on each of the owner’s individual jobs.
Instance Method Summary collapse
- #cluster_info ⇒ Object
-
#delete(id) ⇒ void
Delete the submitted job.
- #directive_prefix ⇒ Object
-
#hold(id) ⇒ void
Put the submitted job on hold.
-
#info(id) ⇒ Info
Retrieve job info from the resource manager.
-
#info_all(attrs: nil) ⇒ Array<Info>
Retrieve info for all jobs from the resource manager.
-
#info_where_owner(owner, attrs: nil) ⇒ Array<Info>
Retrieve info for all jobs for a given owner or owners from the resource manager.
-
#initialize(opts = {}) ⇒ PBSPro
constructor
private
A new instance of PBSPro.
-
#ppn(script) ⇒ Object
place holder for when we support both nodes and cpus.
-
#release(id) ⇒ void
Release the job that is on hold.
-
#status(id) ⇒ Status
Retrieve job status from resource manager.
-
#submit(script, after: [], afterok: [], afternotok: [], afterany: []) ⇒ String
Submit a job with the attributes defined in the job template instance.
Methods inherited from OodCore::Job::Adapter
#accounts, #info_all_each, #info_historic, #info_where_owner_each, #job_name_illegal_chars, #nodes, #queues, #sanitize_job_name, #supports_job_arrays?
Constructor Details
#initialize(opts = {}) ⇒ PBSPro
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of PBSPro.
271 272 273 274 275 276 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 271 def initialize(opts = {}) o = opts.to_h.compact.symbolize_keys @pbspro = o.fetch(:pbspro) { raise ArgumentError, "No pbspro object specified. Missing argument: pbspro" } @qstat_factor = o.fetch(:qstat_factor, 0.10).to_f end |
Instance Attribute Details
#qstat_factor ⇒ Float (readonly)
What percentage of jobs a user owns out of all jobs, used to decide whether we filter the owner’s jobs from a ‘qstat` of all jobs or call `qstat` on each of the owner’s individual jobs
264 265 266 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 264 def qstat_factor @qstat_factor end |
Instance Method Details
#cluster_info ⇒ Object
354 355 356 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 354 def cluster_info @pbspro.get_cluster_info end |
#delete(id) ⇒ void
This method returns an undefined value.
Delete the submitted job
471 472 473 474 475 476 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 471 def delete(id) @pbspro.delete_job(id.to_s) rescue Batch::Error => e # assume successful job deletion if can't find job id raise JobAdapterError, e. unless /Unknown Job Id/ =~ e. || /Job has finished/ =~ e. end |
#directive_prefix ⇒ Object
478 479 480 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 478 def directive_prefix '#PBS' end |
#hold(id) ⇒ void
This method returns an undefined value.
Put the submitted job on hold
447 448 449 450 451 452 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 447 def hold(id) @pbspro.hold_job(id.to_s) rescue Batch::Error => e # assume successful job hold if can't find job id raise JobAdapterError, e. unless /Unknown Job Id/ =~ e. || /Job has finished/ =~ e. end |
#info(id) ⇒ Info
Retrieve job info from the resource manager
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 407 def info(id) id = id.to_s job_infos = @pbspro.get_jobs(id: id).map do |v| parse_job_info(v) end if job_infos.empty? Info.new(id: id, status: :completed) elsif job_infos.length == 1 job_infos.first else process_job_array(id, job_infos) end rescue Batch::Error => e # set completed status if can't find job id if /Unknown Job Id/ =~ e. || /Job has finished/ =~ e. Info.new( id: id, status: :completed ) else raise JobAdapterError, e. end end |
#info_all(attrs: nil) ⇒ Array<Info>
Retrieve info for all jobs from the resource manager
362 363 364 365 366 367 368 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 362 def info_all(attrs: nil) @pbspro.get_jobs.map do |v| parse_job_info(v) end rescue Batch::Error => e raise JobAdapterError, e. end |
#info_where_owner(owner, attrs: nil) ⇒ Array<Info>
Retrieve info for all jobs for a given owner or owners from the resource manager
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 375 def info_where_owner(owner, attrs: nil) owner = Array.wrap(owner).map(&:to_s) usr_jobs = @pbspro.select_jobs(args: ["-u", owner.join(",")]) all_jobs = @pbspro.select_jobs(args: ["-T"]) # `qstat` all jobs if user has too many jobs, otherwise `qstat` each # individual job (default factor is 10%) if usr_jobs.size > (qstat_factor * all_jobs.size) super else begin user_job_infos = [] usr_jobs.each do |id| job = info(id) user_job_infos << job job.tasks.each {|task| user_job_infos << job.build_child_info(task)} end user_job_infos rescue Batch::Error => e raise JobAdapterError, e. end end end |
#ppn(script) ⇒ Object
place holder for when we support both nodes and cpus.
483 484 485 486 487 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 483 def ppn(script) return [] if script.cores.nil? ['-l', "ncpus=#{script.cpus}"] end |
#release(id) ⇒ void
This method returns an undefined value.
Release the job that is on hold
459 460 461 462 463 464 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 459 def release(id) @pbspro.release_job(id.to_s) rescue Batch::Error => e # assume successful job release if can't find job id raise JobAdapterError, e. unless /Unknown Job Id/ =~ e. || /Job has finished/ =~ e. end |
#status(id) ⇒ Status
Retrieve job status from resource manager
438 439 440 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 438 def status(id) info(id.to_s).status end |
#submit(script, after: [], afterok: [], afternotok: [], afterany: []) ⇒ String
Submit a job with the attributes defined in the job template instance
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 293 def submit(script, after: [], afterok: [], afternotok: [], afterany: []) after = Array(after).map(&:to_s) afterok = Array(afterok).map(&:to_s) afternotok = Array(afternotok).map(&:to_s) afterany = Array(afterany).map(&:to_s) # Set qsub options args = [] # ignore args, can't use these if submitting from STDIN args.concat ["-h"] if script.submit_as_hold args.concat ["-r", script.rerunnable ? "y" : "n"] unless script.rerunnable.nil? args.concat ["-M", script.email.join(",")] unless script.email.nil? if script.email_on_started && script.email_on_terminated args.concat ["-m", "be"] elsif script.email_on_started args.concat ["-m", "b"] elsif script.email_on_terminated args.concat ["-m", "e"] end args.concat ["-N", script.job_name] unless script.job_name.nil? args.concat ["-S", script.shell_path] unless script.shell_path.nil? # ignore input_path (not defined in PBS Pro) args.concat ["-o", script.output_path] unless script.output_path.nil? args.concat ["-e", script.error_path] unless script.error_path.nil? # Reservations are actually just queues in PBS Pro args.concat ["-q", script.reservation_id] if !script.reservation_id.nil? && script.queue_name.nil? args.concat ["-q", script.queue_name] unless script.queue_name.nil? args.concat ["-p", script.priority] unless script.priority.nil? args.concat ["-a", script.start_time.localtime.strftime("%C%y%m%d%H%M.%S")] unless script.start_time.nil? args.concat ["-A", script.accounting_id] unless script.accounting_id.nil? args.concat ["-l", "walltime=#{seconds_to_duration(script.wall_time)}"] unless script.wall_time.nil? args.concat ppn(script) # Set dependencies depend = [] depend << "after:#{after.join(":")}" unless after.empty? depend << "afterok:#{afterok.join(":")}" unless afterok.empty? depend << "afternotok:#{afternotok.join(":")}" unless afternotok.empty? depend << "afterany:#{afterany.join(":")}" unless afterany.empty? args.concat ["-W", "depend=#{depend.join(",")}"] unless depend.empty? # Set environment variables envvars = script.job_environment.to_h args.concat ["-v", envvars.map{|k,v| "#{k}=#{v}"}.join(",")] unless envvars.empty? args.concat ["-V"] if script.copy_environment? # If error_path is not specified we join stdout & stderr (as this # mimics what the other resource managers do) args.concat ["-j", "oe"] if script.error_path.nil? args.concat ["-J", script.job_array_request] unless script.job_array_request.nil? # Set native options args.concat script.native if script.native # Submit job @pbspro.submit_string(script.content, args: args, chdir: script.workdir) rescue Batch::Error => e raise JobAdapterError, e. end |