Class: Tep::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/tep/job.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.enqueue(name, arg, db_path) ⇒ Object

Append a ‘queued` row. Returns the new row id (0 on DB error).



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/tep/job.rb', line 106

def self.enqueue(name, arg, db_path)
  db = Tep::SQLite.new
  if !db.open(db_path)
    return 0
  end
  db.prepare("INSERT INTO tep_jobs (job_name, arg, status, created_at) VALUES (?, ?, ?, ?)")
  db.bind_str(1, name)
  db.bind_str(2, arg)
  db.bind_str(3, "queued")
  db.bind_int(4, Time.now.to_i)
  db.step
  db.finalize
  id = db.last_rowid
  db.close
  id
end

.fetch_next(db_path) ⇒ Object

Claim the oldest ‘queued` row and mark it `running`. Returns “row_id|name|arg” packed into one string (the caller splits on “|” with limit 3), or “” if the queue is empty / errored. The row_id is needed for the matching `mark_done` call. Caller is responsible for dispatching to the right subclass and then writing the result back via `mark_done`.



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/tep/job.rb', line 129

def self.fetch_next(db_path)
  db = Tep::SQLite.new
  if !db.open(db_path)
    return ""
  end
  db.prepare("SELECT id, job_name, arg FROM tep_jobs WHERE status = 'queued' ORDER BY id ASC LIMIT 1")
  out = ""
  if db.step == 1
    row_id   = db.col_int(0)
    job_name = db.col_str(1)
    arg      = db.col_str(2)
    out = row_id.to_s + "|" + job_name + "|" + arg
  end
  db.finalize
  if out.length > 0
    db.prepare("UPDATE tep_jobs SET status = 'running' WHERE id = ?")
    db.bind_int(1, row_id)
    db.step
    db.finalize
  end
  db.close
  out
end

.init_schema(db_path) ⇒ Object

Idempotent. Creates the queue table if missing. Pass the same SQLite path to enqueue / fetch_next / mark_done.



93
94
95
96
97
98
99
100
101
102
103
# File 'lib/tep/job.rb', line 93

def self.init_schema(db_path)
  db = Tep::SQLite.new
  if db.open(db_path)
    db.exec("CREATE TABLE IF NOT EXISTS tep_jobs (" +
            "id INTEGER PRIMARY KEY, " +
            "job_name TEXT, arg TEXT, status TEXT, " +
            "created_at INTEGER, finished_at INTEGER, result TEXT)")
    db.close
  end
  0
end

.mark_done(db_path, row_id, result) ⇒ Object

Mark the row ‘done` and store the result string.



154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/tep/job.rb', line 154

def self.mark_done(db_path, row_id, result)
  db = Tep::SQLite.new
  if !db.open(db_path)
    return 0
  end
  db.prepare("UPDATE tep_jobs SET status = 'done', finished_at = ?, result = ? WHERE id = ?")
  db.bind_int(1, Time.now.to_i)
  db.bind_str(2, result)
  db.bind_int(3, row_id)
  db.step
  db.finalize
  db.close
  1
end

.mark_failed(db_path, row_id) ⇒ Object

Mark the row ‘failed`. The error message is not stored by this method; the user writes it via their own SQLite calls if they want it persisted.



172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/tep/job.rb', line 172

def self.mark_failed(db_path, row_id)
  db = Tep::SQLite.new
  if !db.open(db_path)
    return 0
  end
  db.prepare("UPDATE tep_jobs SET status = 'failed', finished_at = ? WHERE id = ?")
  db.bind_int(1, Time.now.to_i)
  db.bind_int(2, row_id)
  db.step
  db.finalize
  db.close
  1
end

Instance Method Details

#perform(arg) ⇒ Object

Subclasses override. The default uses ‘arg` as :str so spinel’s analyzer pins the param type rather than defaulting to :int for an unused parameter – otherwise subclass ‘arg.upcase` calls fail to resolve against an int-typed slot.



87
88
89
# File 'lib/tep/job.rb', line 87

def perform(arg)
  "" + arg
end