Class: Verity::Manifest

Inherits:
Object
  • Object
show all
Defined in:
lib/verity/manifest.rb

Overview

Public: SQLite-backed manifest that coordinates test distribution across workers. Each row tracks a single test’s fingerprint, metadata, and execution status. Workers atomically claim pending rows to run.

Defined Under Namespace

Classes: ClaimedRow

Constant Summary collapse

SCHEMA_VERSION =
3

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path, busy_timeout_ms: 5000) ⇒ Manifest

Returns a new instance of Manifest.



45
46
47
48
49
50
51
# File 'lib/verity/manifest.rb', line 45

def initialize(path, busy_timeout_ms: 5000)
  @memory = (path == ":memory:")
  @busy_timeout_ms = busy_timeout_ms
  FileUtils.mkdir_p(File.dirname(File.expand_path(path))) unless @memory
  @db = SQLite3::Database.new(path)
  configure_connection!
end

Instance Attribute Details

#dbObject (readonly)

Internal: Raw SQLite3::Database handle for tests and introspection.



59
60
61
# File 'lib/verity/manifest.rb', line 59

def db
  @db
end

Class Method Details

.open(path) ⇒ Object

Public: Open (or create) a manifest database at the given path.

path - String file path, or “:memory:” for an in-process database.

Returns a new Manifest instance.



41
42
43
# File 'lib/verity/manifest.rb', line 41

def self.open(path, **)
  new(path, **)
end

Instance Method Details

#claim_next(worker_id, exclude: []) ⇒ Object

Public: Atomically claim the next pending test for a worker. Marks the row as “running” and returns its data.

worker_id - Integer identifying the claiming worker. exclude - Array of fingerprint Strings to skip (default []).

Returns a ClaimedRow, or nil when no claimable pending tests remain.



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/verity/manifest.rb', line 160

def claim_next(worker_id, exclude: [])
  if exclude.empty?
    rows = @db.execute2(<<~SQL, worker_id)
      UPDATE tests
      SET status = 'running', worker_id = ?
      WHERE fingerprint = (
        SELECT fingerprint FROM tests
        WHERE status = 'pending'
        ORDER BY queue_index ASC, fingerprint ASC
        LIMIT 1
      )
      RETURNING
        fingerprint, file, line, description, method,
        tags, requires, resources, timeout,
        status, worker_id, failure, queue_index
    SQL
  else
    placeholders = (["?"] * exclude.size).join(", ")
    rows = @db.execute2(<<~SQL, [worker_id] + exclude)
      UPDATE tests
      SET status = 'running', worker_id = ?
      WHERE fingerprint = (
        SELECT fingerprint FROM tests
        WHERE status = 'pending'
          AND fingerprint NOT IN (#{placeholders})
        ORDER BY queue_index ASC, fingerprint ASC
        LIMIT 1
      )
      RETURNING
        fingerprint, file, line, description, method,
        tags, requires, resources, timeout,
        status, worker_id, failure, queue_index
    SQL
  end
  return nil if rows.size < 2

  headers = rows[0]
  values = rows[1]
  hash = headers.zip(values).to_h
  hydrate_row(hash)
end

#closeObject

Public: Close the underlying SQLite connection.

Returns nothing.



56
# File 'lib/verity/manifest.rb', line 56

def close = @db.close

#count_by_statusObject

Public: Aggregate test counts grouped by status. Used by ParallelSummaryReporter after all workers finish.

Returns a Hash with String status keys and Integer counts.



286
287
288
# File 'lib/verity/manifest.rb', line 286

def count_by_status
  @db.execute("SELECT status, COUNT(*) FROM tests GROUP BY status").to_h.transform_values(&:to_i)
end

#each_parallel_replay_resultObject

Public: After parallel workers finish, yield Verity::Runner::Result once per finished row (passed, failed, or errored) in dispatch (queue_index) order so the parent can replay reporter output (dots, documentation lines, etc.). Workers use NullReporter during execution; child processes do not invoke the user’s reporter.

Yields Verity::Runner::Result.

Returns Enumerator when no block is given.



320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# File 'lib/verity/manifest.rb', line 320

def each_parallel_replay_result
  return enum_for(:each_parallel_replay_result) unless block_given?

  data = @db.execute2(<<~SQL)
    SELECT fingerprint, file, line, description, method,
           tags, requires, resources, timeout,
           status, failure, queue_index
    FROM tests
    WHERE status IN ('passed', 'failed', 'errored')
    ORDER BY queue_index ASC, fingerprint ASC
  SQL
  headers = data[0]
  Array(data[1..]).each do |vals|
    hash = headers.zip(vals).to_h
    row = hydrate_row(hash)
    result_status =
      case row.status
      when :passed then :pass
      when :failed then :fail
      when :errored then :error
      else row.status
      end
    err = replay_exception(result_status, row.failure)
    resources = normalize_resource_keys(row.resources)
    test = Verity::Test.new(
      fingerprint: row.fingerprint,
      description: row.description.to_s,
      tags: Array(row.tags).map(&:to_sym),
      timeout: row.timeout,
      requires: Array(row.requires).map(&:to_sym),
      resources: resources,
      file: row.file,
      line: row.line,
      fn: -> { raise "parallel replay stub" },
      group_path: [].freeze,
      inherited_group_tags: [].freeze,
      group_scopes: [].freeze
    )
    yield Verity::Runner::Result.new(test: test, status: result_status, error: err)
  end
end

#example_countObject

Public: Total number of test rows in the manifest.

Returns an Integer.



278
279
280
# File 'lib/verity/manifest.rb', line 278

def example_count
  @db.get_first_value("SELECT COUNT(*) FROM tests").to_i
end

#failures_for_reportObject

Public: Fetch details for all failed and errored tests, ordered by fingerprint, for the final summary report.

Returns an Array of Hashes with :fingerprint, :description, :status, and :failure keys.



295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/verity/manifest.rb', line 295

def failures_for_report
  @db.execute(<<~SQL).map do |fingerprint, description, status, failure|
      SELECT fingerprint, description, status, failure
      FROM tests
      WHERE status IN ('failed', 'errored')
      ORDER BY fingerprint ASC
    SQL
    {
      fingerprint: fingerprint,
      description: description,
      status: status.to_sym,
      failure: parse_failure(failure)
    }
  end
end

#migrate!Object

Public: Create or upgrade the tests table to the current schema version. Safe to call multiple times; no-ops when already at SCHEMA_VERSION.

Returns nothing.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/verity/manifest.rb', line 65

def migrate!
  loop do
    version = @db.get_first_value("PRAGMA user_version").to_i
    break if version >= SCHEMA_VERSION

    if version.zero?
      @db.transaction do
        @db.execute_batch(<<~SQL)
          CREATE TABLE IF NOT EXISTS tests (
            fingerprint     TEXT PRIMARY KEY,
            file            TEXT NOT NULL,
            line            INTEGER NOT NULL,
            description     TEXT,
            method          TEXT NOT NULL,
            tags            TEXT,
            requires        TEXT,
            resources       TEXT,
            timeout         REAL,
            queue_index     INTEGER NOT NULL DEFAULT 0,
            status          TEXT NOT NULL DEFAULT 'pending',
            worker_id       INTEGER,
            failure         TEXT,
            CHECK (status IN ('pending', 'running', 'passed', 'failed', 'errored'))
          );
          CREATE INDEX IF NOT EXISTS idx_tests_pending
            ON tests (status, fingerprint);
        SQL
        @db.execute("PRAGMA user_version = #{SCHEMA_VERSION}")
      end
    elsif version == 1
      @db.transaction do
        @db.execute("DROP INDEX IF EXISTS idx_tests_pending_duration")
        @db.execute("ALTER TABLE tests DROP COLUMN duration_p50")
        @db.execute(<<~SQL)
          CREATE INDEX IF NOT EXISTS idx_tests_pending
            ON tests (status, fingerprint);
        SQL
        @db.execute("PRAGMA user_version = 2")
      end
    elsif version == 2
      @db.transaction do
        @db.execute("ALTER TABLE tests ADD COLUMN queue_index INTEGER NOT NULL DEFAULT 0")
        @db.execute("PRAGMA user_version = #{SCHEMA_VERSION}")
      end
    else
      raise ArgumentError, "unsupported manifest schema user_version #{version}"
    end
  end
end

#reclaim_abandoned_running!Object

Public: Mark every row still in running status as errored with a coordinator-level message. Call this after worker processes exit when a worker may have terminated without recording a result (crash, kill), so replay and status counts stay consistent.

Returns the number of rows updated.



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/verity/manifest.rb', line 249

def reclaim_abandoned_running!
  n = 0
  @db.transaction do
    n = @db.get_first_value("SELECT COUNT(*) FROM tests WHERE status = 'running'").to_i
    if n > 0
      err = RuntimeError.new("test abandoned: worker exited before recording a result")
      payload = encode_failure(err)
      @db.execute(<<~SQL, [payload])
        UPDATE tests
        SET status = 'errored', failure = ?, worker_id = NULL
        WHERE status = 'running'
      SQL
    end
  end
  n
end

#record_error(fingerprint, error) ⇒ Object

Public: Mark a test as errored (unexpected exception) and store details.

fingerprint - String test fingerprint. error - Exception that caused the error.

Returns nothing.



235
236
237
238
239
240
241
# File 'lib/verity/manifest.rb', line 235

def record_error(fingerprint, error)
  @db.execute(<<~SQL, [encode_failure(error), fingerprint])
    UPDATE tests
    SET status = 'errored', failure = ?, worker_id = NULL
    WHERE fingerprint = ?
  SQL
end

#record_failure(fingerprint, error) ⇒ Object

Public: Mark a test as failed and store the failure details.

fingerprint - String test fingerprint. error - Exception that caused the failure.

Returns nothing.



221
222
223
224
225
226
227
# File 'lib/verity/manifest.rb', line 221

def record_failure(fingerprint, error)
  @db.execute(<<~SQL, [encode_failure(error), fingerprint])
    UPDATE tests
    SET status = 'failed', failure = ?, worker_id = NULL
    WHERE fingerprint = ?
  SQL
end

#record_pass(fingerprint) ⇒ Object

Public: Mark a test as passed.

fingerprint - String test fingerprint.

Returns nothing.



207
208
209
210
211
212
213
# File 'lib/verity/manifest.rb', line 207

def record_pass(fingerprint)
  @db.execute(<<~SQL, [fingerprint])
    UPDATE tests
    SET status = 'passed', failure = NULL, worker_id = NULL
    WHERE fingerprint = ?
  SQL
end

#replace_tests(tests) ⇒ Object

Public: Atomically clear the tests table and insert the given tests as pending rows. Called once per run before workers begin claiming.

tests - Array of Verity::Test instances in coordinator dispatch order.

Each element's index becomes queue_index (claim order).

Returns nothing.



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/verity/manifest.rb', line 122

def replace_tests(tests)
  @db.transaction do
    @db.execute("DELETE FROM tests")
    stmt = @db.prepare(<<~SQL)
      INSERT INTO tests (
        fingerprint, file, line, description, method,
        tags, requires, resources, timeout, queue_index,
        status, worker_id, failure
      ) VALUES (
        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
        'pending', NULL, NULL
      )
    SQL
    tests.each_with_index do |t, queue_index|
      stmt.execute!(
        t.fingerprint,
        t.file,
        t.line,
        t.description,
        derive_method(t.fingerprint),
        dump_json(t.tags),
        dump_json(t.requires),
        dump_json(t.resources),
        t.timeout,
        queue_index
      )
    end
    stmt.close
  end
end

#running_resourcesObject

Public: Return the resources hash for every test currently marked running. Used by Runner to build a conflict exclusion list before claiming.

Returns an Array of Hashes (string keys, string values via JSON).



270
271
272
273
# File 'lib/verity/manifest.rb', line 270

def running_resources
  @db.execute("SELECT resources FROM tests WHERE status = 'running'")
     .map { |r| parse_json_object(r[0]) }
end