Class: Bricolage::DAO::Job
- Inherits:
-
Object
- Object
- Bricolage::DAO::Job
- Includes:
- SQLUtils
- Defined in:
- lib/bricolage/dao/job.rb
Defined Under Namespace
Classes: Attributes
Class Method Summary collapse
Instance Method Summary collapse
- #clear_lock_all(jobnet_id) ⇒ Object
- #find_or_create(jobnet_id, job_ref) ⇒ Object
-
#initialize(datasource) ⇒ Job
constructor
A new instance of Job.
- #lock(job_id, executor_id) ⇒ Object
- #locked?(job_ids) ⇒ Boolean
- #locked_jobs(jobnet_id) ⇒ Object
-
#unlock(job_id, executor_id) ⇒ Object
Unlock the job.
Constructor Details
#initialize(datasource) ⇒ Job
Returns a new instance of Job.
21 22 23 |
# File 'lib/bricolage/dao/job.rb', line 21 def initialize(datasource) @datasource = datasource end |
Class Method Details
.for_record(r) ⇒ Object
11 12 13 14 15 16 17 18 19 |
# File 'lib/bricolage/dao/job.rb', line 11 def Job.for_record(r) Attributes.new( id: r['job_id']&.to_i, subsystem: r['subsystem'], job_name: r['job_name'], jobnet_id: r['jobnet_id']&.to_i, executor_id: r['executor_id'] ) end |
Instance Method Details
#clear_lock_all(jobnet_id) ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/bricolage/dao/job.rb', line 168 def clear_lock_all(jobnet_id) connect {|conn| conn.execute_update(<<~EndSQL) update jobs set executor_id = null where jobnet_id = #{jobnet_id} ; EndSQL } end |
#find_or_create(jobnet_id, job_ref) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/bricolage/dao/job.rb', line 29 def find_or_create(jobnet_id, job_ref) connect {|conn| job = find(conn, jobnet_id, job_ref) # optimize the most frequent case if job job else begin create(conn, jobnet_id, job_ref) rescue UniqueViolationException find(conn, jobnet_id, job_ref) or raise "[BUG] Could not find/create job record: jobnet_id=#{jobnet_id}, ref=#{job_ref}" end end } end |
#lock(job_id, executor_id) ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/bricolage/dao/job.rb', line 129 def lock(job_id, executor_id) records = connect {|conn| conn.execute_update(<<~EndSQL) update jobs set executor_id = #{s executor_id} where job_id = #{job_id} and executor_id is null returning job_id ; EndSQL } if records.empty? raise DoubleLockError, "Could not lock job: job_id=#{job_id}" end end |
#locked?(job_ids) ⇒ Boolean
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/bricolage/dao/job.rb', line 87 def locked?(job_ids) count = connect {|conn| conn.query_value(<<~EndSQL) select count(job_id) from jobs where job_id in (#{job_ids.join(',')}) and executor_id is not null ; EndSQL } count.to_i > 0 end |
#locked_jobs(jobnet_id) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/bricolage/dao/job.rb', line 104 def locked_jobs(jobnet_id) records = connect {|conn| conn.query_rows(<<~EndSQL) select "job_id" , "subsystem" , "job_name" , jobnet_id , "executor_id" from jobs where jobnet_id = #{jobnet_id} and executor_id is not null ; EndSQL } if records.empty? [] else record.map {|r| Job.for_record(r) } end end |
#unlock(job_id, executor_id) ⇒ Object
Unlock the job. Returns true if successfully unlocked, otherwise false. FIXME: raise an exception on failing unlock?
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/bricolage/dao/job.rb', line 151 def unlock(job_id, executor_id) records = connect {|conn| conn.execute_update(<<~EndSQL) update jobs set executor_id = null where job_id = #{job_id} and executor_id = #{s executor_id} returning job_id ; EndSQL } not records.empty? end |