Class: Bricolage::DatabaseTaskQueue
- Inherits:
-
Object
- Object
- Bricolage::DatabaseTaskQueue
- Defined in:
- lib/bricolage/taskqueue.rb
Defined Under Namespace
Classes: Task
Instance Method Summary collapse
- #cancel_jobnet(jobnet, message) ⇒ Object
- #consume_each ⇒ Object
- #each ⇒ Object
- #empty? ⇒ Boolean
- #enqueue_jobnet(jobnet) ⇒ Object
-
#initialize(datasource:, executor_id:, enable_lock: false) ⇒ DatabaseTaskQueue
constructor
A new instance of DatabaseTaskQueue.
- #locked?(jobnet) ⇒ Boolean
- #restore_jobnet(jobnet) ⇒ Object
- #size ⇒ Object
- #unlock_help(jobnet) ⇒ Object
Constructor Details
#initialize(datasource:, executor_id:, enable_lock: false) ⇒ DatabaseTaskQueue
Returns a new instance of DatabaseTaskQueue.
177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/bricolage/taskqueue.rb', line 177 def initialize(datasource:, executor_id:, enable_lock: false) @ds = datasource @executor_id = executor_id @enable_lock = enable_lock @queue = [] @jobnet_dao = DAO::JobNet.new(@ds) @job_dao = DAO::Job.new(@ds) @jobexecution_dao = DAO::JobExecution.new(@ds) @jobnet = nil end |
Instance Method Details
#cancel_jobnet(jobnet, message) ⇒ Object
286 287 288 289 290 291 |
# File 'lib/bricolage/taskqueue.rb', line 286 def cancel_jobnet(jobnet, ) @jobexecution_dao.cancel_jobnet(jobnet.ref, ) jobnet_rec = find_or_create_jobnet(jobnet.ref) @jobnet_dao.clear_lock(jobnet_rec.id) @job_dao.clear_lock_all(jobnet_rec.id) end |
#consume_each ⇒ Object
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/bricolage/taskqueue.rb', line 231 def consume_each raise "jobnet is not bound to queue" unless @jobnet jobnet_rec = find_or_create_jobnet(@jobnet.ref) @jobnet_dao.lock(jobnet_rec.id, @executor_id) if @enable_lock while task = @queue.first @job_dao.lock(task.job_id, @executor_id) if @enable_lock begin @jobexecution_dao.transition_to_running(task.job_execution_id) # Note: fork(2) breaks current connections, # we must close current connections before fork. # (psql datasource forks process) @ds.clear_connection_pool job_completed = false begin task_result = yield task.job if task_result.success? @jobexecution_dao.transition_to_succeeded(task.job_execution_id) job_completed = true @queue.shift else @jobexecution_dao.transition_to_failed(task.job_execution_id, task_result.) job_completed = true break end ensure unless job_completed begin @jobexecution_dao.transition_to_failed(task.job_execution_id, 'unexpected error') rescue => ex $stderr.puts "warning: could not write job state: #{ex.class}: #{ex.} (this error is ignored)" end end end ensure @job_dao.unlock(task.job_id, @executor_id) if @enable_lock end end ensure @jobnet_dao.unlock(jobnet_rec.id, @executor_id) if @enable_lock end |
#each ⇒ Object
225 226 227 228 229 |
# File 'lib/bricolage/taskqueue.rb', line 225 def each @queue.each do |task| yield task.job end end |
#empty? ⇒ Boolean
189 190 191 |
# File 'lib/bricolage/taskqueue.rb', line 189 def empty? @queue.empty? end |
#enqueue_jobnet(jobnet) ⇒ Object
213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/bricolage/taskqueue.rb', line 213 def enqueue_jobnet(jobnet) raise "jobnet is already bound to queue" if @jobnet jobnet_rec = find_or_create_jobnet(jobnet.ref) jobnet.sequential_jobs.each_with_index do |job_ref, index| job = @job_dao.find_or_create(jobnet_rec.id, job_ref) job_execution = @jobexecution_dao.enqueue_job(job, index + 1) @queue.push Task.for_job_execution(job_execution) end @jobnet = jobnet end |
#locked?(jobnet) ⇒ Boolean
276 277 278 |
# File 'lib/bricolage/taskqueue.rb', line 276 def locked?(jobnet) @jobnet_dao.locked?(jobnet.ref) end |
#restore_jobnet(jobnet) ⇒ Object
201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/bricolage/taskqueue.rb', line 201 def restore_jobnet(jobnet) raise "jobnet is already bound to queue" if @jobnet job_executions = @jobexecution_dao.enqueued_jobs(jobnet.ref) unless job_executions.empty? job_executions.each do |job_execution| @queue.push Task.for_job_execution(job_execution) end @jobnet = jobnet end end |
#size ⇒ Object
193 194 195 |
# File 'lib/bricolage/taskqueue.rb', line 193 def size @queue.size end |
#unlock_help(jobnet) ⇒ Object
280 281 282 283 284 |
# File 'lib/bricolage/taskqueue.rb', line 280 def unlock_help(jobnet) jobnet_rec = find_or_create_jobnet(jobnet.ref) locked_jobs = @job_dao.locked_jobs(jobnet_rec.id) "clear executor_id of the jobnet (id: #{jobnet_rec.id}) and/or the jobs (id: #{locked_jobs.map(&:id).join(', ')})" end |