Class: Bricolage::DAO::Job

Inherits:
Object
  • Object
show all
Includes:
SQLUtils
Defined in:
lib/bricolage/dao/job.rb

Defined Under Namespace

Classes: Attributes

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(datasource) ⇒ Job

Returns a new instance of Job.



21
22
23
# File 'lib/bricolage/dao/job.rb', line 21

def initialize(datasource)
  @datasource = datasource
end

Class Method Details

.for_record(r) ⇒ Object



11
12
13
14
15
16
17
18
19
# File 'lib/bricolage/dao/job.rb', line 11

def Job.for_record(r)
  Attributes.new(
    id: r['job_id']&.to_i,
    subsystem: r['subsystem'],
    job_name: r['job_name'],
    jobnet_id: r['jobnet_id']&.to_i,
    executor_id: r['executor_id']
  )
end

Instance Method Details

#clear_lock_all(jobnet_id) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/bricolage/dao/job.rb', line 168

def clear_lock_all(jobnet_id)
  connect {|conn|
    conn.execute_update(<<~EndSQL)
      update jobs
      set
          executor_id = null
      where
          jobnet_id = #{jobnet_id}
      ;
    EndSQL
  }
end

#find_or_create(jobnet_id, job_ref) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/bricolage/dao/job.rb', line 29

def find_or_create(jobnet_id, job_ref)
  connect {|conn|
    job = find(conn, jobnet_id, job_ref)   # optimize the most frequent case
    if job
      job
    else
      begin
        create(conn, jobnet_id, job_ref)
      rescue UniqueViolationException
        find(conn, jobnet_id, job_ref) or raise "[BUG] Could not find/create job record: jobnet_id=#{jobnet_id}, ref=#{job_ref}"
      end
    end
  }
end

#lock(job_id, executor_id) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/bricolage/dao/job.rb', line 129

def lock(job_id, executor_id)
  records = connect {|conn|
    conn.execute_update(<<~EndSQL)
        update jobs
        set
            executor_id = #{s executor_id}
        where
            job_id = #{job_id}
            and executor_id is null
        returning job_id
        ;
    EndSQL
  }

  if records.empty?
    raise DoubleLockError, "Could not lock job: job_id=#{job_id}"
  end
end

#locked?(job_ids) ⇒ Boolean

Returns:

  • (Boolean)


87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/bricolage/dao/job.rb', line 87

def locked?(job_ids)
  count = connect {|conn|
    conn.query_value(<<~EndSQL)
        select
            count(job_id)
        from
            jobs
        where
            job_id in (#{job_ids.join(',')})
            and executor_id is not null
        ;
    EndSQL
  }

  count.to_i > 0
end

#locked_jobs(jobnet_id) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/bricolage/dao/job.rb', line 104

def locked_jobs(jobnet_id)
  records = connect {|conn|
    conn.query_rows(<<~EndSQL)
        select
            "job_id"
            , "subsystem"
            , "job_name"
            , jobnet_id
            , "executor_id"
        from
            jobs
        where
            jobnet_id = #{jobnet_id}
            and executor_id is not null
        ;
    EndSQL
  }

  if records.empty?
    []
  else
    record.map {|r| Job.for_record(r) }
  end
end

#unlock(job_id, executor_id) ⇒ Object

Unlock the job. Returns true if successfully unlocked, otherwise false. FIXME: raise an exception on failing unlock?



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/bricolage/dao/job.rb', line 151

def unlock(job_id, executor_id)
  records = connect {|conn|
    conn.execute_update(<<~EndSQL)
      update jobs
      set
          executor_id = null
      where
          job_id = #{job_id}
          and executor_id = #{s executor_id}
      returning job_id
      ;
    EndSQL
  }

  not records.empty?
end