Class: Tep::Job
- Inherits:
-
Object
- Object
- Tep::Job
- Defined in:
- lib/tep/job.rb
Class Method Summary collapse
-
.enqueue(name, arg, db_path) ⇒ Object
Append a ‘queued` row.
-
.fetch_next(db_path) ⇒ Object
Claim the oldest ‘queued` row and mark it `running`.
-
.init_schema(db_path) ⇒ Object
Idempotent.
-
.mark_done(db_path, row_id, result) ⇒ Object
Mark the row ‘done` and store the result string.
-
.mark_failed(db_path, row_id) ⇒ Object
Mark the row ‘failed`.
Instance Method Summary collapse
-
#perform(arg) ⇒ Object
Subclasses override.
Class Method Details
.enqueue(name, arg, db_path) ⇒ Object
Append a ‘queued` row. Returns the new row id (0 on DB error).
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/tep/job.rb', line 106 def self.enqueue(name, arg, db_path) db = Tep::SQLite.new if !db.open(db_path) return 0 end db.prepare("INSERT INTO tep_jobs (job_name, arg, status, created_at) VALUES (?, ?, ?, ?)") db.bind_str(1, name) db.bind_str(2, arg) db.bind_str(3, "queued") db.bind_int(4, Time.now.to_i) db.step db.finalize id = db.last_rowid db.close id end |
.fetch_next(db_path) ⇒ Object
Claim the oldest ‘queued` row and mark it `running`. Returns “row_id|name|arg” packed into one string (the caller splits on “|” with limit 3), or “” if the queue is empty / errored. The row_id is needed for the matching `mark_done` call. Caller is responsible for dispatching to the right subclass and then writing the result back via `mark_done`.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/tep/job.rb', line 129 def self.fetch_next(db_path) db = Tep::SQLite.new if !db.open(db_path) return "" end db.prepare("SELECT id, job_name, arg FROM tep_jobs WHERE status = 'queued' ORDER BY id ASC LIMIT 1") out = "" if db.step == 1 row_id = db.col_int(0) job_name = db.col_str(1) arg = db.col_str(2) out = row_id.to_s + "|" + job_name + "|" + arg end db.finalize if out.length > 0 db.prepare("UPDATE tep_jobs SET status = 'running' WHERE id = ?") db.bind_int(1, row_id) db.step db.finalize end db.close out end |
.init_schema(db_path) ⇒ Object
Idempotent. Creates the queue table if missing. Pass the same SQLite path to enqueue / fetch_next / mark_done.
93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/tep/job.rb', line 93 def self.init_schema(db_path) db = Tep::SQLite.new if db.open(db_path) db.exec("CREATE TABLE IF NOT EXISTS tep_jobs (" + "id INTEGER PRIMARY KEY, " + "job_name TEXT, arg TEXT, status TEXT, " + "created_at INTEGER, finished_at INTEGER, result TEXT)") db.close end 0 end |
.mark_done(db_path, row_id, result) ⇒ Object
Mark the row ‘done` and store the result string.
154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/tep/job.rb', line 154 def self.mark_done(db_path, row_id, result) db = Tep::SQLite.new if !db.open(db_path) return 0 end db.prepare("UPDATE tep_jobs SET status = 'done', finished_at = ?, result = ? WHERE id = ?") db.bind_int(1, Time.now.to_i) db.bind_str(2, result) db.bind_int(3, row_id) db.step db.finalize db.close 1 end |
.mark_failed(db_path, row_id) ⇒ Object
Mark the row ‘failed`. The error message is not stored by this method; the user writes it via their own SQLite calls if they want it persisted.
172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/tep/job.rb', line 172 def self.mark_failed(db_path, row_id) db = Tep::SQLite.new if !db.open(db_path) return 0 end db.prepare("UPDATE tep_jobs SET status = 'failed', finished_at = ? WHERE id = ?") db.bind_int(1, Time.now.to_i) db.bind_int(2, row_id) db.step db.finalize db.close 1 end |
Instance Method Details
#perform(arg) ⇒ Object
Subclasses override. The default uses ‘arg` as :str so spinel’s analyzer pins the param type rather than defaulting to :int for an unused parameter – otherwise subclass ‘arg.upcase` calls fail to resolve against an int-typed slot.
87 88 89 |
# File 'lib/tep/job.rb', line 87 def perform(arg) "" + arg end |