Class: Bricolage::JobNetRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/bricolage/jobnetrunner.rb

Defined Under Namespace

Classes: Options

Constant Summary collapse

EXIT_SUCCESS =
JobResult::EXIT_SUCCESS
EXIT_FAILURE =
JobResult::EXIT_FAILURE
EXIT_ERROR =
JobResult::EXIT_ERROR

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeJobNetRunner

Returns a new instance of JobNetRunner.



30
31
32
33
34
# File 'lib/bricolage/jobnetrunner.rb', line 30

def initialize
  @hooks = ::Bricolage
  @jobnet_id = nil
  @jobnet_start_time = Time.now
end

Class Method Details

.mainObject



25
26
27
28
# File 'lib/bricolage/jobnetrunner.rb', line 25

def JobNetRunner.main
  Application.install_signal_handlers
  new.main
end

Instance Method Details

#app_nameObject



136
137
138
139
140
141
142
# File 'lib/bricolage/jobnetrunner.rb', line 136

def app_name
  path = @ctx.home_path.realpath
  while /\A(?:\d+|current|releases)\z/ =~ path.basename.to_s   # is Capistrano dirs
    path = path.dirname
  end
  path.basename.to_s
end

#check_jobs(queue) ⇒ Object



150
151
152
153
154
# File 'lib/bricolage/jobnetrunner.rb', line 150

def check_jobs(queue)
  queue.each do |job|
    Job.load_ref(job, @ctx).compile
  end
end

#error_exit(msg) ⇒ Object



207
208
209
210
# File 'lib/bricolage/jobnetrunner.rb', line 207

def error_exit(msg)
  print_error msg
  exit 1
end

#execute_job(ref, queue) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/bricolage/jobnetrunner.rb', line 174

def execute_job(ref, queue)
  logger.info "job #{ref}"
  job_start_time = Time.now
  job = Job.load_ref(ref, @ctx)
  job.compile
  @hooks.run_before_job_hooks(BeforeJobEvent.new(ref))
  result = job.execute_in_process(log_locator: make_log_locator(ref, job_start_time))
  @hooks.run_after_job_hooks(AfterJobEvent.new(result))
  result
rescue Exception => ex
  logger.exception ex
  logger.error "unexpected error: #{ref} (#{ex.class}: #{ex.message})"
  JobResult.error(ex)
end

#get_executor_id(executor_type) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/bricolage/jobnetrunner.rb', line 113

def get_executor_id(executor_type)
  # executor_id is 'TaskID:PID' or 'Hostname:PID'
  if executor_type == 'ecs'
    uri = URI.parse("#{ENV['ECS_CONTAINER_METADATA_URI']}/task")
    response = Net::HTTP.get_response(uri)
    task_id = JSON.parse(response.body)['TaskARN'].split('/').last
    "#{task_id}:#{$$}"
  else
    hostname = Socket.gethostname
    "#{hostname}:#{$$}"
  end
end

#get_queue_file_path(opts) ⇒ Object



126
127
128
129
130
131
132
133
134
# File 'lib/bricolage/jobnetrunner.rb', line 126

def get_queue_file_path(opts)
  if opts.queue_path
    opts.queue_path
  elsif opts.enable_queue?
    opts.local_state_dir + 'queue' + "#{app_name}.#{@jobnet_id.tr('/', '.')}"
  else
    nil
  end
end

#list_jobs(queue) ⇒ Object



144
145
146
147
148
# File 'lib/bricolage/jobnetrunner.rb', line 144

def list_jobs(queue)
  queue.each do |job|
    puts job
  end
end

#loggerObject



95
96
97
# File 'lib/bricolage/jobnetrunner.rb', line 95

def logger
  @ctx.logger
end

#mainObject



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/bricolage/jobnetrunner.rb', line 40

def main
  opts = Options.new(self)
  @hooks.run_before_option_parsing_hooks(opts)
  opts.parse!(ARGV)

  @ctx = Context.for_application(job_path: opts.jobnet_files.first, environment: opts.environment, option_variables: opts.option_variables)
  opts.merge_saved_options(@ctx.load_system_options)

  jobnet = RootJobNet.load_auto(@ctx, opts.jobnet_files)
  @jobnet_id = jobnet.id

  if opts.dump_options?
    puts "jobnet-id=#{@jobnet_id}"
    puts "jobnet-file=#{opts.jobnet_files.first}"
    opts.option_pairs.each do |key, value|
      puts "#{key}=#{value.inspect}"
    end
    exit EXIT_SUCCESS
  end

  queue = make_queue(opts)
  if queue.locked?(jobnet)
    raise ParameterError, "Job queue is still locked. If you are sure to restart jobnet, #{queue.unlock_help(jobnet)}"
  end
  if opts.clear_queue?
    queue.cancel_jobnet(jobnet, 'cancelled by --clear-queue')
    logger.info "queue is unlocked and cleared"
    exit EXIT_SUCCESS
  end
  queue.restore_jobnet(jobnet)
  if queue.empty?
    queue.enqueue_jobnet(jobnet)
  end

  if opts.list_jobs?
    list_jobs queue
    exit EXIT_SUCCESS
  end
  check_jobs queue
  if opts.check_only?
    puts "OK"
    exit EXIT_SUCCESS
  end

  @log_locator_builder = LogLocatorBuilder.for_options(@ctx, opts.log_path_format, opts.log_s3_ds, opts.log_s3_key_format)
  run_queue queue
  exit EXIT_SUCCESS
rescue OptionError => ex
  raise if $DEBUG
  usage_exit ex.message, opts.help
rescue ApplicationError => ex
  raise if $DEBUG
  error_exit ex.message
end

#make_log_locator(ref, job_start_time) ⇒ Object



189
190
191
192
193
194
195
196
# File 'lib/bricolage/jobnetrunner.rb', line 189

def make_log_locator(ref, job_start_time)
  @log_locator_builder.build(
    job_ref: ref,
    jobnet_id: @jobnet_id,
    job_start_time: job_start_time,
    jobnet_start_time: @jobnet_start_time
  )
end

#make_queue(opts) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/bricolage/jobnetrunner.rb', line 99

def make_queue(opts)
  if opts.db_name
    logger.info "Enables DB queue: datasource=#{opts.db_name}"
    datasource = @ctx.get_data_source('psql', opts.db_name)
    executor_id = get_executor_id(opts.executor_type)
    DatabaseTaskQueue.new(datasource: datasource, executor_id: executor_id, enable_lock: false)
  elsif path = get_queue_file_path(opts)
    logger.info "Enables file queue: #{path}"
    FileTaskQueue.new(path: path)
  else
    MemoryTaskQueue.new
  end
end


212
213
214
# File 'lib/bricolage/jobnetrunner.rb', line 212

def print_error(msg)
  $stderr.puts "#{program_name}: error: #{msg}"
end

#program_nameObject



216
217
218
# File 'lib/bricolage/jobnetrunner.rb', line 216

def program_name
  File.basename($PROGRAM_NAME, '.*')
end

#run_queue(queue) ⇒ Object



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/bricolage/jobnetrunner.rb', line 156

def run_queue(queue)
  result = nil
  job = nil
  @hooks.run_before_all_jobs_hooks(BeforeAllJobsEvent.new(@jobnet_id, queue))
  queue.consume_each do |job|
    result = execute_job(job, queue)
  end
  @hooks.run_after_all_jobs_hooks(AfterAllJobsEvent.new(result.success?, queue))
  logger.elapsed_time 'jobnet total: ', (Time.now - @jobnet_start_time)

  if result.success?
    logger.info "status all green"
  else
    logger.error "[job #{job}] #{result.message}"
    exit result.status
  end
end

#usage_exit(msg, usage) ⇒ Object



201
202
203
204
205
# File 'lib/bricolage/jobnetrunner.rb', line 201

def usage_exit(msg, usage)
  print_error msg
  $stderr.puts usage
  exit 1
end