Class: MiGA::Daemon

Inherits:
MiGA
  • Object
show all
Extended by:
Common::WithDaemonClass
Includes:
Common::WithDaemon, Base
Defined in:
lib/miga/daemon.rb,
lib/miga/daemon/base.rb

Overview

MiGA Daemons handling job submissions.

Defined Under Namespace

Modules: Base

Constant Summary

Constants included from MiGA

CITATION, VERSION, VERSION_DATE, VERSION_NAME

Instance Attribute Summary collapse

Attributes included from Common::WithDaemon

#declare_alive_pid, #loop_i

Attributes included from Common::Net

#remote_connection_uri

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Common::WithDaemonClass

alive_file, last_alive, terminated_file

Methods included from Common::WithDaemon

#active?, #alive_file, #daemon, #declare_alive, #declare_alive_loop, #default_options, #in_loop, #last_alive, #launch_daemon_proc, #output_file, #pid_file, #process_alive?, #run, #start, #status, #stop, #terminate, #terminate_file, #terminated_file, #termination_file?, #write_alive_file

Methods included from Base

#bypass_maintenance?, #latency, #logfh, #maxjobs, #nodelist, #ppn, #runopts, #runopts_for, #show_log!, #show_log?, #show_summary!, #shutdown_when_done?, #skip_maintenance, #verbosity

Methods inherited from MiGA

CITATION, CITATION_ARRAY, DEBUG, DEBUG_OFF, DEBUG_ON, DEBUG_TRACE_OFF, DEBUG_TRACE_ON, FULL_VERSION, LONG_VERSION, VERSION, VERSION_DATE, #advance, debug?, debug_trace?, initialized?, #like_io?, #num_suffix, rc_path, #result_files_exist?

Methods included from Common::Path

#root_path, #script_path

Methods included from Common::Format

#clean_fasta_file, #seqs_length, #tabulate

Methods included from Common::Net

#data_server, #download_file_ftp, #http_request, #known_hosts, #main_server, #net_method, #normalize_encoding, #remote_connection

Methods included from Common::SystemCall

#run_cmd, #run_cmd_opts

Constructor Details

#initialize(project, json = nil) ⇒ Daemon

Initialize an unactive daemon for the MiGA::Project project. See #daemon to wake the daemon. If passed, json must be the path to a daemon definition in json format. Otherwise, the project-stored daemon definition is used. In either case, missing variables are used as defined in ~/.miga_daemon.json.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/miga/daemon.rb', line 44

def initialize(project, json = nil)
  @project = project
  @runopts = {}
  ddir = File.join(project.path, 'daemon')
  json ||= File.join(ddir, 'daemon.json')
  FileUtils.mkdir_p(ddir) unless Dir.exist?(ddir)
  File.open(json, 'w') { |fh| fh.puts '{}' } unless File.exist?(json)
  default_json = File.expand_path('.miga_daemon.json', ENV['MIGA_HOME'])
  MiGA::Json.parse(
    json, default: File.exist?(default_json) ? default_json : nil
  ).each { |k, v| runopts(k, v) }
  update_format_0
  @jobs_to_run = []
  @jobs_running = []
end

Instance Attribute Details

#jobs_runningObject (readonly)

Array of jobs currently running



36
37
38
# File 'lib/miga/daemon.rb', line 36

def jobs_running
  @jobs_running
end

#jobs_to_runObject (readonly)

Array of jobs next to be executed



33
34
35
# File 'lib/miga/daemon.rb', line 33

def jobs_to_run
  @jobs_to_run
end

#optionsObject (readonly)

Options used to setup the daemon



30
31
32
# File 'lib/miga/daemon.rb', line 30

def options
  @options
end

#projectObject (readonly)

MiGA::Project in which the daemon is running



27
28
29
# File 'lib/miga/daemon.rb', line 27

def project
  @project
end

Class Method Details

.daemon_home(project) ⇒ Object

Daemon’s home inside the MiGA::Project project or a String with the full path to the project’s ‘daemon’ folder



19
20
21
22
23
# File 'lib/miga/daemon.rb', line 19

def daemon_home(project)
  return project if project.is_a? String

  File.join(project.path, 'daemon')
end

Instance Method Details

#check_datasetsObject

Traverse datasets, and returns boolean indicating if any reference datasets are incomplete



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/miga/daemon.rb', line 191

def check_datasets
  l_say(2, 'Checking datasets')
  o = false
  project.each_dataset do |ds|
    next unless ds.status == :incomplete
    next if ds.next_preprocessing(false).nil?

    o = true if ds.ref?
    queue_job(:d, ds)
  end
  unless show_log?
    @_check_datasets_reported_done ||= false
    n = project.dataset_names.count
    k = (jobs_to_run + jobs_running).select { |i| !i[:ds].nil? }.size
    if k > 0
      advance('Datasets:', n - k, n, false)
      @_check_datasets_reported_done = false
    elsif !@_check_datasets_reported_done
      advance('Datasets:', n, n, false)
      miga_say
      @_check_datasets_reported_done = true
    end
  end
  o
end

#check_projectObject

Check if all reference datasets are pre-processed. If yes, check the project-level tasks



220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/miga/daemon.rb', line 220

def check_project
  l_say(2, 'Checking project')

  # Ignore task if the project has no datasets
  return if project.dataset_names.empty?

  # Double-check if all datasets are ready
  return unless project.done_preprocessing?

  # Queue project-level job
  to_run = project.next_task(nil, false)
  queue_job(:p) unless to_run.nil?
end

#daemon_first_loopObject

Run only in the first loop



80
81
82
83
84
85
86
87
88
89
# File 'lib/miga/daemon.rb', line 80

def daemon_first_loop
  say '-----------------------------------'
  say 'MiGA:%s launched' % project.name
  say '-----------------------------------'
  miga_say "Saving log to: #{output_file}" unless show_log?
  say 'Configuration options:'
  say @runopts.to_s
  load_status
  queue_maintenance(true)
end

#daemon_homeObject

Path to the daemon home



62
63
64
# File 'lib/miga/daemon.rb', line 62

def daemon_home
  self.class.daemon_home(project)
end

#daemon_loopObject

Run one loop step. Returns a Boolean indicating if the loop should continue



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/miga/daemon.rb', line 93

def daemon_loop
  l_say(3, 'Daemon loop start')
  reload_project
  check_datasets or check_project
  if shutdown_when_done? && (jobs_running.size + jobs_to_run.size).zero?
    say 'Nothing else to do, shutting down'
    exit_cleanup
    return false
  end
  flush!
  if (loop_i % 12).zero?
    purge!
    queue_maintenance if (loop_i % (12 * (skip_maintenance + 1))).zero?
  end
  save_status
  sleep(latency)
  l_say(3, 'Daemon loop end')
  true
end

#daemon_nameObject

Name of the daemon



68
69
70
# File 'lib/miga/daemon.rb', line 68

def daemon_name
  "MiGA:#{project.name}"
end

#exit_cleanupObject

Remove temporary files on completion



124
125
126
# File 'lib/miga/daemon.rb', line 124

def exit_cleanup
  FileUtils.rm_f(File.join(daemon_home, 'status.json'))
end

#flush!Object

Remove finished jobs from the internal queue and launch as many as possible respecting #maxjobs or #nodelist (if set).



294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/miga/daemon.rb', line 294

def flush!
  # Check for finished jobs
  l_say(2, 'Checking for finished jobs')
  @jobs_running.select! do |job|
    ongoing =
      case job[:job].to_s
      when 'd'
        !job[:ds].nil? && !job[:ds].next_preprocessing(false).nil?
      when 'p'
        !project.next_task(nil, false).nil?
      else
        (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false).nil?
      end
    say "Completed pid:#{job[:pid]} for #{job[:task_name]}" unless ongoing
    ongoing
  end

  # Avoid single datasets hogging resources
  @jobs_to_run.rotate! rand(jobs_to_run.size)

  # Prioritize: Project-wide > MiGA Online queries > Other datasets
  @jobs_to_run.sort_by! do |job|
    job[:ds].nil? ? 1 : job[:ds_name] =~ /^qG_/ ? 2 : 3
  end

  # Launch as many +jobs_to_run+ as possible
  while (hostk = next_host)
    break if jobs_to_run.empty?

    launch_job(@jobs_to_run.shift, hostk)
  end
end

#get_job(job, ds = nil) ⇒ Object

Get the taks with key symbol job in dataset ds. For project-wide tasks let ds be nil.



281
282
283
284
285
286
287
288
289
# File 'lib/miga/daemon.rb', line 281

def get_job(job, ds = nil)
  (jobs_to_run + jobs_running).find do |j|
    if ds.nil?
      j[:ds].nil? && j[:job] == job
    else
      !j[:ds].nil? && j[:ds].name == ds.name && j[:job] == job
    end
  end
end

#job_cmd(to_run) ⇒ Object

Construct the command for the given job definition with current daemon settings



251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/miga/daemon.rb', line 251

def job_cmd(to_run)
  what = to_run[:ds].nil? ? :project : :dataset
  vars = {
    'PROJECT' => project.path,
    'RUNTYPE' => runopts_for(:type, what),
    'CORES' => ppn(what),
    'MIGA' => MiGA::MiGA.root_path
  }
  vars['DATASET'] = to_run[:ds].name unless to_run[:ds].nil?
  log_dir = File.expand_path("daemon/#{to_run[:job]}", project.path)
  FileUtils.mkdir_p(log_dir)
  var_hsh = {
    script: MiGA::MiGA.script_path(
              to_run[:job], miga: vars['MIGA'], project: project
            ),
    vars: vars.map do |k, v|
            runopts(:var).miga_variables(key: k, value: v)
          end.join(runopts_for(:varsep, what)),
    cpus: ppn(what),
    log: File.join(log_dir, "#{to_run[:ds_name]}.log"),
    task_name: to_run[:task_name],
    task_name_simple: to_run[:task_name].gsub(/[^A-Za-z0-9_]/, '-'),
    miga: File.join(MiGA::MiGA.root_path, 'bin/miga').shellescape
  }
  runopts_for(:cmd, what).miga_variables(var_hsh)
end

#l_say(level, *msg) ⇒ Object

Send msg to say as long as level is at most verbosity



130
131
132
# File 'lib/miga/daemon.rb', line 130

def l_say(level, *msg)
  say(*msg) if verbosity >= level
end

#launch_job(job, hostk = nil) ⇒ Object

Launch the job described by Hash job to hostk-th host



351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
# File 'lib/miga/daemon.rb', line 351

def launch_job(job, hostk = nil)
  # Execute job
  job[:cmd] = job_cmd(job)
  MiGA::MiGA.DEBUG "CMD: #{job[:cmd]}"
  case runopts(:type)
  when 'ssh'
    # Remote job
    job[:hostk] = hostk
    job[:cmd] = job[:cmd].miga_variables(host: nodelist[hostk])
    job[:pid] = spawn job[:cmd]
    MiGA::MiGA.DEBUG "Detaching PID: #{job[:pid]}"
    Process.detach(job[:pid]) unless [nil, '', 0].include?(job[:pid])
  when 'bash'
    # Local job
    job[:pid] = spawn job[:cmd]
    MiGA::MiGA.DEBUG "Detaching PID: #{job[:pid]}"
    Process.detach(job[:pid]) unless [nil, '', 0].include?(job[:pid])
  else
    # Schedule cluster job (qsub, msub, slurm)
    job[:pid] = MiGA::MiGA.run_cmd(job[:cmd], return: :output).chomp
  end

  # Check if registered
  if [nil, '', 0].include? job[:pid]
    job[:pid] = nil
    @jobs_to_run << job
    say "Unsuccessful #{job[:task_name]}, rescheduling"
  else
    @jobs_running << job
    job_host = " to #{job[:hostk]}:#{nodelist[job[:hostk]]}" if job[:hostk]
    say "Spawned pid:#{job[:pid]}#{job_host} for #{job[:task_name]}"
  end
end

#load_statusObject

Load the status of a previous instance.



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/miga/daemon.rb', line 164

def load_status
  f_path = File.join(daemon_home, 'status.json')
  return unless File.size? f_path

  say 'Loading previous status in daemon/status.json:'
  status = MiGA::Json.parse(f_path)
  status.each_key do |i|
    status[i].map! do |j|
      j.tap do |k|
        unless k[:ds].nil? || k[:ds_name] == 'miga-project'
          k[:ds] = project.dataset(k[:ds_name])
        end
        k[:job] = k[:job].to_sym unless k[:job].nil?
      end
    end
  end
  @jobs_running = status[:jobs_running]
  @jobs_to_run  = status[:jobs_to_run]
  say "- jobs left running: #{@jobs_running.size}"
  purge!
  say "- jobs running: #{@jobs_running.size}"
  say "- jobs to run: #{@jobs_to_run.size}"
end

#miga_sayObject

Rename the orginal MiGA::MiGA.say as miga_say, allowing external reporting since MiGA::Daemon overwrites say



137
# File 'lib/miga/daemon.rb', line 137

alias_method :miga_say, :say

#next_hostObject

In SSH daemons, retrieve the host index of an available node, nil if none. In any other daemons, returns true as long as #maxjobs is not reached



330
331
332
333
334
335
336
# File 'lib/miga/daemon.rb', line 330

def next_host
  return jobs_running.size < maxjobs if runopts(:type) != 'ssh'

  allk = (0..nodelist.size - 1).to_a
  busyk = jobs_running.map { |k| k[:hostk] }
  (allk - busyk).first
end

#pathObject

Alias to project.path for compatibility with lairs



74
75
76
# File 'lib/miga/daemon.rb', line 74

def path
  project.path
end

#purge!Object

Remove dead jobs.



340
341
342
343
344
345
346
347
# File 'lib/miga/daemon.rb', line 340

def purge!
  say 'Probing running jobs'
  @jobs_running.select! do |job|
    MiGA::MiGA.run_cmd(
      runopts(:alive).miga_variables(pid: job[:pid]), return: :output
    ).chomp.to_i == 1
  end
end

#queue_job(job, ds = nil) ⇒ Object

Add the task to the internal queue with symbol key job. If the task is dataset-specific, ds specifies the dataset. To submit jobs to the scheduler (or to bash or ssh) see #flush!



238
239
240
241
242
243
244
245
246
# File 'lib/miga/daemon.rb', line 238

def queue_job(job, ds = nil)
  return nil unless get_job(job, ds).nil?

  ds_name = (ds.nil? ? 'miga-project' : ds.name)
  task_name = "#{project.[:name][0..9]}:#{job}:#{ds_name}"
  to_run = { ds: ds, ds_name: ds_name, job: job, task_name: task_name }
  say 'Queueing %s:%s' % [to_run[:ds_name], to_run[:job]]
  @jobs_to_run << to_run
end

#queue_maintenance(force = false) ⇒ Object

Queue maintenance tasks as an analysis job



115
116
117
118
119
120
# File 'lib/miga/daemon.rb', line 115

def queue_maintenance(force = false)
  return if bypass_maintenance? || (!force && shutdown_when_done?)

  say 'Queueing maintenance tasks'
  queue_job(:maintenance)
end

#reload_projectObject

Reload the project’s metadata



147
148
149
150
# File 'lib/miga/daemon.rb', line 147

def reload_project
  l_say(2, 'Reloading project')
  project.load
end

#save_statusObject

Report status in a JSON file.



154
155
156
157
158
159
160
# File 'lib/miga/daemon.rb', line 154

def save_status
  l_say(2, 'Saving current status')
  MiGA::Json.generate(
    { jobs_running: @jobs_running, jobs_to_run: @jobs_to_run },
    File.join(daemon_home, 'status.json')
  )
end

#say(*msg) ⇒ Object

Same as l_say with level = 1



141
142
143
# File 'lib/miga/daemon.rb', line 141

def say(*msg)
  super(logfh, *msg) if verbosity >= 1
end

#update_format_0Object

Update from daemon JSON format 0 to the latest version



387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
# File 'lib/miga/daemon.rb', line 387

def update_format_0
  {
    cmd: %w[script vars cpus log task_name],
    var: %w[key value],
    alive: %w[pid],
    kill: %w[pid]
  }.each do |k, v|
    if !runopts(k).nil? && runopts(k) =~ /%(\d+\$)?[ds]/
      runopts(
        k, runopts(k).gsub(/%(\d+\$)?d/, '%\\1s') % v.map { |i| "{{#{i}}}" }
      )
    end
  end
  runopts(:format_version, 1)
end