Class: Verity::Manifest
- Inherits:
-
Object
- Object
- Verity::Manifest
- Defined in:
- lib/verity/manifest.rb
Overview
Public: SQLite-backed manifest that coordinates test distribution across workers. Each row tracks a single test’s fingerprint, metadata, and execution status. Workers atomically claim pending rows to run.
Defined Under Namespace
Classes: ClaimedRow
Constant Summary collapse
- SCHEMA_VERSION =
3
Instance Attribute Summary collapse
-
#db ⇒ Object
readonly
Internal: Raw SQLite3::Database handle for tests and introspection.
Class Method Summary collapse
-
.open(path) ⇒ Object
Public: Open (or create) a manifest database at the given path.
Instance Method Summary collapse
-
#claim_next(worker_id, exclude: []) ⇒ Object
Public: Atomically claim the next pending test for a worker.
-
#close ⇒ Object
Public: Close the underlying SQLite connection.
-
#count_by_status ⇒ Object
Public: Aggregate test counts grouped by status.
-
#each_parallel_replay_result ⇒ Object
Public: After parallel workers finish, yield Verity::Runner::Result once per finished row (passed, failed, or errored) in dispatch (queue_index) order so the parent can replay reporter output (dots, documentation lines, etc.).
-
#example_count ⇒ Object
Public: Total number of test rows in the manifest.
-
#failures_for_report ⇒ Object
Public: Fetch details for all failed and errored tests, ordered by fingerprint, for the final summary report.
-
#initialize(path, busy_timeout_ms: 5000) ⇒ Manifest
constructor
A new instance of Manifest.
-
#migrate! ⇒ Object
Public: Create or upgrade the tests table to the current schema version.
-
#reclaim_abandoned_running! ⇒ Object
Public: Mark every row still in
runningstatus aserroredwith a coordinator-level message. -
#record_error(fingerprint, error) ⇒ Object
Public: Mark a test as errored (unexpected exception) and store details.
-
#record_failure(fingerprint, error) ⇒ Object
Public: Mark a test as failed and store the failure details.
-
#record_pass(fingerprint) ⇒ Object
Public: Mark a test as passed.
-
#replace_tests(tests) ⇒ Object
Public: Atomically clear the tests table and insert the given tests as pending rows.
-
#running_resources ⇒ Object
Public: Return the resources hash for every test currently marked running.
Constructor Details
#initialize(path, busy_timeout_ms: 5000) ⇒ Manifest
Returns a new instance of Manifest.
45 46 47 48 49 50 51 |
# File 'lib/verity/manifest.rb', line 45 def initialize(path, busy_timeout_ms: 5000) @memory = (path == ":memory:") @busy_timeout_ms = busy_timeout_ms FileUtils.mkdir_p(File.dirname(File.(path))) unless @memory @db = SQLite3::Database.new(path) configure_connection! end |
Instance Attribute Details
#db ⇒ Object (readonly)
Internal: Raw SQLite3::Database handle for tests and introspection.
59 60 61 |
# File 'lib/verity/manifest.rb', line 59 def db @db end |
Class Method Details
.open(path) ⇒ Object
Public: Open (or create) a manifest database at the given path.
path - String file path, or “:memory:” for an in-process database.
Returns a new Manifest instance.
41 42 43 |
# File 'lib/verity/manifest.rb', line 41 def self.open(path, **) new(path, **) end |
Instance Method Details
#claim_next(worker_id, exclude: []) ⇒ Object
Public: Atomically claim the next pending test for a worker. Marks the row as “running” and returns its data.
worker_id - Integer identifying the claiming worker. exclude - Array of fingerprint Strings to skip (default []).
Returns a ClaimedRow, or nil when no claimable pending tests remain.
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/verity/manifest.rb', line 160 def claim_next(worker_id, exclude: []) if exclude.empty? rows = @db.execute2(<<~SQL, worker_id) UPDATE tests SET status = 'running', worker_id = ? WHERE fingerprint = ( SELECT fingerprint FROM tests WHERE status = 'pending' ORDER BY queue_index ASC, fingerprint ASC LIMIT 1 ) RETURNING fingerprint, file, line, description, method, tags, requires, resources, timeout, status, worker_id, failure, queue_index SQL else placeholders = (["?"] * exclude.size).join(", ") rows = @db.execute2(<<~SQL, [worker_id] + exclude) UPDATE tests SET status = 'running', worker_id = ? WHERE fingerprint = ( SELECT fingerprint FROM tests WHERE status = 'pending' AND fingerprint NOT IN (#{placeholders}) ORDER BY queue_index ASC, fingerprint ASC LIMIT 1 ) RETURNING fingerprint, file, line, description, method, tags, requires, resources, timeout, status, worker_id, failure, queue_index SQL end return nil if rows.size < 2 headers = rows[0] values = rows[1] hash = headers.zip(values).to_h hydrate_row(hash) end |
#close ⇒ Object
Public: Close the underlying SQLite connection.
Returns nothing.
56 |
# File 'lib/verity/manifest.rb', line 56 def close = @db.close |
#count_by_status ⇒ Object
Public: Aggregate test counts grouped by status. Used by ParallelSummaryReporter after all workers finish.
Returns a Hash with String status keys and Integer counts.
286 287 288 |
# File 'lib/verity/manifest.rb', line 286 def count_by_status @db.execute("SELECT status, COUNT(*) FROM tests GROUP BY status").to_h.transform_values(&:to_i) end |
#each_parallel_replay_result ⇒ Object
Public: After parallel workers finish, yield Verity::Runner::Result once per finished row (passed, failed, or errored) in dispatch (queue_index) order so the parent can replay reporter output (dots, documentation lines, etc.). Workers use NullReporter during execution; child processes do not invoke the user’s reporter.
Yields Verity::Runner::Result.
Returns Enumerator when no block is given.
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 |
# File 'lib/verity/manifest.rb', line 320 def each_parallel_replay_result return enum_for(:each_parallel_replay_result) unless block_given? data = @db.execute2(<<~SQL) SELECT fingerprint, file, line, description, method, tags, requires, resources, timeout, status, failure, queue_index FROM tests WHERE status IN ('passed', 'failed', 'errored') ORDER BY queue_index ASC, fingerprint ASC SQL headers = data[0] Array(data[1..]).each do |vals| hash = headers.zip(vals).to_h row = hydrate_row(hash) result_status = case row.status when :passed then :pass when :failed then :fail when :errored then :error else row.status end err = replay_exception(result_status, row.failure) resources = normalize_resource_keys(row.resources) test = Verity::Test.new( fingerprint: row.fingerprint, description: row.description.to_s, tags: Array(row.).map(&:to_sym), timeout: row.timeout, requires: Array(row.requires).map(&:to_sym), resources: resources, file: row.file, line: row.line, fn: -> { raise "parallel replay stub" }, group_path: [].freeze, inherited_group_tags: [].freeze, group_scopes: [].freeze ) yield Verity::Runner::Result.new(test: test, status: result_status, error: err) end end |
#example_count ⇒ Object
Public: Total number of test rows in the manifest.
Returns an Integer.
278 279 280 |
# File 'lib/verity/manifest.rb', line 278 def example_count @db.get_first_value("SELECT COUNT(*) FROM tests").to_i end |
#failures_for_report ⇒ Object
Public: Fetch details for all failed and errored tests, ordered by fingerprint, for the final summary report.
Returns an Array of Hashes with :fingerprint, :description, :status, and :failure keys.
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/verity/manifest.rb', line 295 def failures_for_report @db.execute(<<~SQL).map do |fingerprint, description, status, failure| SELECT fingerprint, description, status, failure FROM tests WHERE status IN ('failed', 'errored') ORDER BY fingerprint ASC SQL { fingerprint: fingerprint, description: description, status: status.to_sym, failure: parse_failure(failure) } end end |
#migrate! ⇒ Object
Public: Create or upgrade the tests table to the current schema version. Safe to call multiple times; no-ops when already at SCHEMA_VERSION.
Returns nothing.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/verity/manifest.rb', line 65 def migrate! loop do version = @db.get_first_value("PRAGMA user_version").to_i break if version >= SCHEMA_VERSION if version.zero? @db.transaction do @db.execute_batch(<<~SQL) CREATE TABLE IF NOT EXISTS tests ( fingerprint TEXT PRIMARY KEY, file TEXT NOT NULL, line INTEGER NOT NULL, description TEXT, method TEXT NOT NULL, tags TEXT, requires TEXT, resources TEXT, timeout REAL, queue_index INTEGER NOT NULL DEFAULT 0, status TEXT NOT NULL DEFAULT 'pending', worker_id INTEGER, failure TEXT, CHECK (status IN ('pending', 'running', 'passed', 'failed', 'errored')) ); CREATE INDEX IF NOT EXISTS idx_tests_pending ON tests (status, fingerprint); SQL @db.execute("PRAGMA user_version = #{SCHEMA_VERSION}") end elsif version == 1 @db.transaction do @db.execute("DROP INDEX IF EXISTS idx_tests_pending_duration") @db.execute("ALTER TABLE tests DROP COLUMN duration_p50") @db.execute(<<~SQL) CREATE INDEX IF NOT EXISTS idx_tests_pending ON tests (status, fingerprint); SQL @db.execute("PRAGMA user_version = 2") end elsif version == 2 @db.transaction do @db.execute("ALTER TABLE tests ADD COLUMN queue_index INTEGER NOT NULL DEFAULT 0") @db.execute("PRAGMA user_version = #{SCHEMA_VERSION}") end else raise ArgumentError, "unsupported manifest schema user_version #{version}" end end end |
#reclaim_abandoned_running! ⇒ Object
Public: Mark every row still in running status as errored with a coordinator-level message. Call this after worker processes exit when a worker may have terminated without recording a result (crash, kill), so replay and status counts stay consistent.
Returns the number of rows updated.
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/verity/manifest.rb', line 249 def reclaim_abandoned_running! n = 0 @db.transaction do n = @db.get_first_value("SELECT COUNT(*) FROM tests WHERE status = 'running'").to_i if n > 0 err = RuntimeError.new("test abandoned: worker exited before recording a result") payload = encode_failure(err) @db.execute(<<~SQL, [payload]) UPDATE tests SET status = 'errored', failure = ?, worker_id = NULL WHERE status = 'running' SQL end end n end |
#record_error(fingerprint, error) ⇒ Object
Public: Mark a test as errored (unexpected exception) and store details.
fingerprint - String test fingerprint. error - Exception that caused the error.
Returns nothing.
235 236 237 238 239 240 241 |
# File 'lib/verity/manifest.rb', line 235 def record_error(fingerprint, error) @db.execute(<<~SQL, [encode_failure(error), fingerprint]) UPDATE tests SET status = 'errored', failure = ?, worker_id = NULL WHERE fingerprint = ? SQL end |
#record_failure(fingerprint, error) ⇒ Object
Public: Mark a test as failed and store the failure details.
fingerprint - String test fingerprint. error - Exception that caused the failure.
Returns nothing.
221 222 223 224 225 226 227 |
# File 'lib/verity/manifest.rb', line 221 def record_failure(fingerprint, error) @db.execute(<<~SQL, [encode_failure(error), fingerprint]) UPDATE tests SET status = 'failed', failure = ?, worker_id = NULL WHERE fingerprint = ? SQL end |
#record_pass(fingerprint) ⇒ Object
Public: Mark a test as passed.
fingerprint - String test fingerprint.
Returns nothing.
207 208 209 210 211 212 213 |
# File 'lib/verity/manifest.rb', line 207 def record_pass(fingerprint) @db.execute(<<~SQL, [fingerprint]) UPDATE tests SET status = 'passed', failure = NULL, worker_id = NULL WHERE fingerprint = ? SQL end |
#replace_tests(tests) ⇒ Object
Public: Atomically clear the tests table and insert the given tests as pending rows. Called once per run before workers begin claiming.
tests - Array of Verity::Test instances in coordinator dispatch order.
Each element's index becomes queue_index (claim order).
Returns nothing.
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/verity/manifest.rb', line 122 def replace_tests(tests) @db.transaction do @db.execute("DELETE FROM tests") stmt = @db.prepare(<<~SQL) INSERT INTO tests ( fingerprint, file, line, description, method, tags, requires, resources, timeout, queue_index, status, worker_id, failure ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending', NULL, NULL ) SQL tests.each_with_index do |t, queue_index| stmt.execute!( t.fingerprint, t.file, t.line, t.description, derive_method(t.fingerprint), dump_json(t.), dump_json(t.requires), dump_json(t.resources), t.timeout, queue_index ) end stmt.close end end |
#running_resources ⇒ Object
Public: Return the resources hash for every test currently marked running. Used by Runner to build a conflict exclusion list before claiming.
Returns an Array of Hashes (string keys, string values via JSON).
270 271 272 273 |
# File 'lib/verity/manifest.rb', line 270 def running_resources @db.execute("SELECT resources FROM tests WHERE status = 'running'") .map { |r| parse_json_object(r[0]) } end |