Class: Bricolage::DatabaseTaskQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/bricolage/taskqueue.rb

Defined Under Namespace

Classes: Task

Instance Method Summary collapse

Constructor Details

#initialize(datasource:, executor_id:, enable_lock: false) ⇒ DatabaseTaskQueue

Returns a new instance of DatabaseTaskQueue.



177
178
179
180
181
182
183
184
185
186
187
# File 'lib/bricolage/taskqueue.rb', line 177

def initialize(datasource:, executor_id:, enable_lock: false)
  @ds = datasource
  @executor_id = executor_id
  @enable_lock = enable_lock

  @queue = []
  @jobnet_dao = DAO::JobNet.new(@ds)
  @job_dao = DAO::Job.new(@ds)
  @jobexecution_dao = DAO::JobExecution.new(@ds)
  @jobnet = nil
end

Instance Method Details

#cancel_jobnet(jobnet, message) ⇒ Object



286
287
288
289
290
291
# File 'lib/bricolage/taskqueue.rb', line 286

def cancel_jobnet(jobnet, message)
  @jobexecution_dao.cancel_jobnet(jobnet.ref, message)
  jobnet_rec = find_or_create_jobnet(jobnet.ref)
  @jobnet_dao.clear_lock(jobnet_rec.id)
  @job_dao.clear_lock_all(jobnet_rec.id)
end

#consume_eachObject



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/bricolage/taskqueue.rb', line 231

def consume_each
  raise "jobnet is not bound to queue" unless @jobnet

  jobnet_rec = find_or_create_jobnet(@jobnet.ref)
  @jobnet_dao.lock(jobnet_rec.id, @executor_id) if @enable_lock
  while task = @queue.first
    @job_dao.lock(task.job_id, @executor_id) if @enable_lock
    begin
      @jobexecution_dao.transition_to_running(task.job_execution_id)

      # Note: fork(2) breaks current connections,
      # we must close current connections before fork.
      # (psql datasource forks process)
      @ds.clear_connection_pool

      job_completed = false
      begin
        task_result = yield task.job

        if task_result.success?
          @jobexecution_dao.transition_to_succeeded(task.job_execution_id)
          job_completed = true
          @queue.shift
        else
          @jobexecution_dao.transition_to_failed(task.job_execution_id, task_result.message)
          job_completed = true
          break
        end
      ensure
        unless job_completed
          begin
            @jobexecution_dao.transition_to_failed(task.job_execution_id, 'unexpected error')
          rescue => ex
            $stderr.puts "warning: could not write job state: #{ex.class}: #{ex.message} (this error is ignored)"
          end
        end
      end
    ensure
      @job_dao.unlock(task.job_id, @executor_id) if @enable_lock
    end
  end
ensure
  @jobnet_dao.unlock(jobnet_rec.id, @executor_id) if @enable_lock
end

#eachObject



225
226
227
228
229
# File 'lib/bricolage/taskqueue.rb', line 225

def each
  @queue.each do |task|
    yield task.job
  end
end

#empty?Boolean

Returns:

  • (Boolean)


189
190
191
# File 'lib/bricolage/taskqueue.rb', line 189

def empty?
  @queue.empty?
end

#enqueue_jobnet(jobnet) ⇒ Object



213
214
215
216
217
218
219
220
221
222
223
# File 'lib/bricolage/taskqueue.rb', line 213

def enqueue_jobnet(jobnet)
  raise "jobnet is already bound to queue" if @jobnet

  jobnet_rec = find_or_create_jobnet(jobnet.ref)
  jobnet.sequential_jobs.each_with_index do |job_ref, index|
    job = @job_dao.find_or_create(jobnet_rec.id, job_ref)
    job_execution = @jobexecution_dao.enqueue_job(job, index + 1)
    @queue.push Task.for_job_execution(job_execution)
  end
  @jobnet = jobnet
end

#locked?(jobnet) ⇒ Boolean

Returns:

  • (Boolean)


276
277
278
# File 'lib/bricolage/taskqueue.rb', line 276

def locked?(jobnet)
  @jobnet_dao.locked?(jobnet.ref)
end

#restore_jobnet(jobnet) ⇒ Object



201
202
203
204
205
206
207
208
209
210
211
# File 'lib/bricolage/taskqueue.rb', line 201

def restore_jobnet(jobnet)
  raise "jobnet is already bound to queue" if @jobnet

  job_executions = @jobexecution_dao.enqueued_jobs(jobnet.ref)
  unless job_executions.empty?
    job_executions.each do |job_execution|
      @queue.push Task.for_job_execution(job_execution)
    end
    @jobnet = jobnet
  end
end

#sizeObject



193
194
195
# File 'lib/bricolage/taskqueue.rb', line 193

def size
  @queue.size
end

#unlock_help(jobnet) ⇒ Object



280
281
282
283
284
# File 'lib/bricolage/taskqueue.rb', line 280

def unlock_help(jobnet)
  jobnet_rec = find_or_create_jobnet(jobnet.ref)
  locked_jobs = @job_dao.locked_jobs(jobnet_rec.id)
  "clear executor_id of the jobnet (id: #{jobnet_rec.id}) and/or the jobs (id: #{locked_jobs.map(&:id).join(', ')})"
end