Class: Iriq::Storage::Sqlite

Inherits:
Object
  • Object
show all
Defined in:
lib/iriq/storage/sqlite.rb

Overview

Sqlite is the incremental-write backend. Each observation translates to a handful of UPSERTs against a long-lived connection; nothing is materialized in memory beyond what reads explicitly ask for.

WAL journaling lets multiple processes observe against the same file concurrently — the writer is serialized, readers are not blocked, and the existing ‘iriq –corpus c.db <url>` pattern works without a flock at the application layer.

Constant Summary collapse

SCHEMA_VERSION =
4
SCHEMA =
<<~SQL.freeze
  CREATE TABLE IF NOT EXISTS meta (
    key   TEXT PRIMARY KEY,
    value TEXT
  );
  CREATE TABLE IF NOT EXISTS host_counts (
    host  TEXT PRIMARY KEY,
    count INTEGER NOT NULL
  );
  CREATE TABLE IF NOT EXISTS path_length_counts (
    length INTEGER PRIMARY KEY,
    count  INTEGER NOT NULL
  );
  CREATE TABLE IF NOT EXISTS raw_shape_counts (
    shape TEXT PRIMARY KEY,
    count INTEGER NOT NULL
  );
  CREATE TABLE IF NOT EXISTS fingerprint_counts (
    shape TEXT PRIMARY KEY,
    count INTEGER NOT NULL
  );
  -- Position is (host, scope, locator). For scope='path' the locator
  -- is the typed prefix; for scope='query' it's the param name.
  -- Today only 'path' is observed here (query params live on the
  -- cluster_* tables) — scope is in the schema so future commits
  -- can fold query positions in without another migration.
  CREATE TABLE IF NOT EXISTS position_stats (
    host    TEXT NOT NULL,
    scope   TEXT NOT NULL,
    locator TEXT NOT NULL,
    total   INTEGER NOT NULL DEFAULT 0,
    PRIMARY KEY (host, scope, locator)
  );
  CREATE TABLE IF NOT EXISTS position_values (
    host    TEXT NOT NULL,
    scope   TEXT NOT NULL,
    locator TEXT NOT NULL,
    value   TEXT NOT NULL,
    count   INTEGER NOT NULL,
    PRIMARY KEY (host, scope, locator, value)
  );
  CREATE TABLE IF NOT EXISTS position_types (
    host    TEXT NOT NULL,
    scope   TEXT NOT NULL,
    locator TEXT NOT NULL,
    type    TEXT NOT NULL,
    count   INTEGER NOT NULL,
    PRIMARY KEY (host, scope, locator, type)
  );
  CREATE TABLE IF NOT EXISTS clusters (
    key    TEXT PRIMARY KEY,
    host   TEXT,
    scheme TEXT,
    shape  TEXT,
    count  INTEGER NOT NULL DEFAULT 0,
    ord    INTEGER NOT NULL
  );
  CREATE TABLE IF NOT EXISTS cluster_examples (
    cluster_key TEXT NOT NULL,
    position    INTEGER NOT NULL,
    canonical   TEXT NOT NULL,
    PRIMARY KEY (cluster_key, position)
  );
  CREATE TABLE IF NOT EXISTS cluster_segments (
    cluster_key TEXT NOT NULL,
    position    INTEGER NOT NULL,
    value       TEXT NOT NULL,
    count       INTEGER NOT NULL,
    PRIMARY KEY (cluster_key, position, value)
  );
  CREATE TABLE IF NOT EXISTS cluster_params (
    cluster_key TEXT NOT NULL,
    name        TEXT NOT NULL,
    total       INTEGER NOT NULL DEFAULT 0,
    PRIMARY KEY (cluster_key, name)
  );
  CREATE TABLE IF NOT EXISTS cluster_param_values (
    cluster_key TEXT NOT NULL,
    name        TEXT NOT NULL,
    value       TEXT NOT NULL,
    count       INTEGER NOT NULL,
    PRIMARY KEY (cluster_key, name, value)
  );
  CREATE TABLE IF NOT EXISTS cluster_param_types (
    cluster_key TEXT NOT NULL,
    name        TEXT NOT NULL,
    type        TEXT NOT NULL,
    count       INTEGER NOT NULL,
    PRIMARY KEY (cluster_key, name, type)
  );
  -- Source-IRI log. The materialized views above are derived from
  -- this log via events + reducers. Corpus#reinfer drops the views
  -- and replays the log to rebuild them. id is monotonic so
  -- iteration order is observation order.
  CREATE TABLE IF NOT EXISTS observed_iris (
    id        INTEGER PRIMARY KEY AUTOINCREMENT,
    canonical TEXT NOT NULL
  );
  -- Recognizers promoted from RecognizerProposal via
  -- Corpus#activate_proposal. Re-applied to the corpus's
  -- classifier on Corpus.open so a reopen picks up its learned
  -- patterns. Keyed by prefix; activating the same prefix twice
  -- is a no-op.
  CREATE TABLE IF NOT EXISTS activated_recognizers (
    prefix      TEXT PRIMARY KEY,
    type        TEXT NOT NULL,
    specificity REAL NOT NULL DEFAULT 1.0
  );
SQL

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path:, classifier: SegmentClassifier::DEFAULT, max_values_per_position: PositionStats::DEFAULT_MAX_VALUES) ⇒ Sqlite

Returns a new instance of Sqlite.



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/iriq/storage/sqlite.rb', line 133

def initialize(path:, classifier: SegmentClassifier::DEFAULT,
               max_values_per_position: PositionStats::DEFAULT_MAX_VALUES)
  @path                    = path
  @classifier              = classifier
  @max_values_per_position = max_values_per_position
  @db                      = SQLite3::Database.new(path)
  # busy_timeout MUST come first: other PRAGMAs (journal_mode in
  # particular) can themselves block on the write lock under
  # concurrent open, and without busy_timeout set they fail
  # immediately with SQLITE_BUSY.
  @db.execute("PRAGMA busy_timeout = 30000")
  @db.execute("PRAGMA journal_mode = WAL")
  @db.execute("PRAGMA synchronous = NORMAL")
  @db.execute("PRAGMA foreign_keys = ON")
  @in_batch = false
end

Instance Attribute Details

#max_values_per_positionObject (readonly)

Returns the value of attribute max_values_per_position.



126
127
128
# File 'lib/iriq/storage/sqlite.rb', line 126

def max_values_per_position
  @max_values_per_position
end

#pathObject (readonly)

Returns the value of attribute path.



126
127
128
# File 'lib/iriq/storage/sqlite.rb', line 126

def path
  @path
end

Class Method Details

.open(path, classifier: SegmentClassifier::DEFAULT, max_values_per_position: PositionStats::DEFAULT_MAX_VALUES) ⇒ Object



128
129
130
131
# File 'lib/iriq/storage/sqlite.rb', line 128

def self.open(path, classifier: SegmentClassifier::DEFAULT,
                    max_values_per_position: PositionStats::DEFAULT_MAX_VALUES)
  new(path: path, classifier: classifier, max_values_per_position: max_values_per_position).tap(&:setup!)
end

Instance Method Details

#activated_recognizer_countObject



371
372
373
# File 'lib/iriq/storage/sqlite.rb', line 371

def activated_recognizer_count
  @db.get_first_value("SELECT COUNT(*) FROM activated_recognizers") || 0
end

#add_to_cluster(key, host, scheme, shape, identifier) ⇒ Object



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/iriq/storage/sqlite.rb', line 274

def add_to_cluster(key, host, scheme, shape, identifier)
  # Insert the cluster row if new (with a monotonic ord for stable
  # iteration), then bump its count.
  @db.execute(<<~SQL, [key, host, scheme, shape])
    INSERT INTO clusters (key, host, scheme, shape, count, ord)
    VALUES (?, ?, ?, ?, 1, (SELECT COALESCE(MAX(ord), 0) + 1 FROM clusters))
    ON CONFLICT(key) DO UPDATE SET count = count + 1
  SQL

  # Examples — capped at Cluster::MAX_EXAMPLES.
  examples_count = @db.get_first_value(
    "SELECT COUNT(*) FROM cluster_examples WHERE cluster_key = ?", [key],
  )
  if examples_count < Cluster::MAX_EXAMPLES
    @db.execute(<<~SQL, [key, examples_count, identifier.canonical])
      INSERT INTO cluster_examples (cluster_key, position, canonical)
      VALUES (?, ?, ?)
    SQL
  end

  # Per-position segment counts — uncapped.
  identifier.path_segments.each_with_index do |seg, i|
    @db.execute(<<~SQL, [key, i, seg])
      INSERT INTO cluster_segments (cluster_key, position, value, count) VALUES (?, ?, ?, 1)
      ON CONFLICT(cluster_key, position, value) DO UPDATE SET count = count + 1
    SQL
  end

  # Per-param stats (presence + value cardinality + type) — mirrors the
  # in-memory Cluster#add path. Value table respects the same per-key
  # cap as position_values.
  (identifier.query_params || {}).each do |name, value|
    v = value.to_s
    type = @classifier.classify(v).to_s

    @db.execute(<<~SQL, [key, name])
      INSERT INTO cluster_params (cluster_key, name, total) VALUES (?, ?, 1)
      ON CONFLICT(cluster_key, name) DO UPDATE SET total = total + 1
    SQL
    @db.execute(<<~SQL, [key, name, type])
      INSERT INTO cluster_param_types (cluster_key, name, type, count) VALUES (?, ?, ?, 1)
      ON CONFLICT(cluster_key, name, type) DO UPDATE SET count = count + 1
    SQL

    @db.execute(<<~SQL, [key, name, v])
      UPDATE cluster_param_values SET count = count + 1
      WHERE cluster_key = ? AND name = ? AND value = ?
    SQL
    if @db.changes.zero?
      card = @db.get_first_value(
        "SELECT COUNT(*) FROM cluster_param_values WHERE cluster_key = ? AND name = ?",
        [key, name],
      )
      if card < @max_values_per_position
        @db.execute(
          "INSERT INTO cluster_param_values (cluster_key, name, value, count) VALUES (?, ?, ?, 1)",
          [key, name, v],
        )
      end
    end
  end

  load_cluster(key)
end

#batchObject

Wrap many observations in a single transaction. Cuts SQLite write overhead from O(observations) fsyncs to O(1).



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/iriq/storage/sqlite.rb', line 180

def batch
  return yield if @in_batch

  @in_batch = true
  @db.transaction
  begin
    yield
    @db.commit
  rescue
    @db.rollback rescue nil
    raise
  ensure
    @in_batch = false
  end
end

#clear_materialized_viewsObject

Drop every materialized view without touching the source-IRI log. Corpus#reinfer calls this before replaying the log.



377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
# File 'lib/iriq/storage/sqlite.rb', line 377

def clear_materialized_views
  @db.execute_batch(<<~SQL)
    DELETE FROM host_counts;
    DELETE FROM path_length_counts;
    DELETE FROM raw_shape_counts;
    DELETE FROM fingerprint_counts;
    DELETE FROM position_stats;
    DELETE FROM position_values;
    DELETE FROM position_types;
    DELETE FROM clusters;
    DELETE FROM cluster_examples;
    DELETE FROM cluster_segments;
    DELETE FROM cluster_params;
    DELETE FROM cluster_param_values;
    DELETE FROM cluster_param_types;
  SQL
end

#closeObject



204
205
206
207
208
209
# File 'lib/iriq/storage/sqlite.rb', line 204

def close
  # Checkpoint + truncate the WAL so the .db-wal sidecar doesn't grow
  # unbounded across long-lived `iriq --corpus c.db` sessions.
  @db.execute("PRAGMA wal_checkpoint(TRUNCATE)") rescue nil
  @db.close
end

#cluster_for(key) ⇒ Object



468
469
470
# File 'lib/iriq/storage/sqlite.rb', line 468

def cluster_for(key)
  load_cluster(key)
end

#cluster_sizeObject



464
465
466
# File 'lib/iriq/storage/sqlite.rb', line 464

def cluster_size
  @db.get_first_value("SELECT COUNT(*) FROM clusters")
end

#clustersObject



456
457
458
459
460
461
462
# File 'lib/iriq/storage/sqlite.rb', line 456

def clusters
  out = []
  @db.execute("SELECT key FROM clusters ORDER BY ord") do |row|
    out << load_cluster(row[0])
  end
  out
end

#each_activated_recognizerObject



365
366
367
368
369
# File 'lib/iriq/storage/sqlite.rb', line 365

def each_activated_recognizer
  @db.execute("SELECT prefix, type, specificity FROM activated_recognizers ORDER BY prefix") do |row|
    yield({ "prefix" => row[0], "type" => row[1], "specificity" => row[2] })
  end
end

#each_observed_iriObject



346
347
348
349
350
# File 'lib/iriq/storage/sqlite.rb', line 346

def each_observed_iri
  @db.execute("SELECT canonical FROM observed_iris ORDER BY id") do |row|
    yield row[0]
  end
end

#each_position_statsObject



445
446
447
448
449
450
451
452
453
454
# File 'lib/iriq/storage/sqlite.rb', line 445

def each_position_stats
  seen = []
  @db.execute("SELECT DISTINCT host, scope, locator FROM position_stats ORDER BY ROWID") do |row|
    seen << row
  end
  seen.each do |host, scope, locator|
    pos = Position.new(host: host, scope: scope.to_sym, locator: locator)
    yield pos, position_stats(pos)
  end
end

#fingerprint_countsObject



411
412
413
# File 'lib/iriq/storage/sqlite.rb', line 411

def fingerprint_counts
  rows_to_count_hash("fingerprint_counts", "shape")
end

#flushObject

Saving is automatic — incremental UPSERTs hit disk on commit. flush makes that explicit; close releases the connection.



198
# File 'lib/iriq/storage/sqlite.rb', line 198

def flush; end

#host_countsObject

— Reads ————————————————————



397
398
399
# File 'lib/iriq/storage/sqlite.rb', line 397

def host_counts
  rows_to_count_hash("host_counts", "host")
end

#increment_fingerprint(shape) ⇒ Object



233
234
235
# File 'lib/iriq/storage/sqlite.rb', line 233

def increment_fingerprint(shape)
  upsert_shape("fingerprint_counts", shape)
end

#increment_host(host) ⇒ Object

— Increments ——————————————————-



213
214
215
216
217
218
219
220
# File 'lib/iriq/storage/sqlite.rb', line 213

def increment_host(host)
  return unless host

  @db.execute(<<~SQL, host)
    INSERT INTO host_counts (host, count) VALUES (?, 1)
    ON CONFLICT(host) DO UPDATE SET count = count + 1
  SQL
end

#increment_path_length(length) ⇒ Object



222
223
224
225
226
227
# File 'lib/iriq/storage/sqlite.rb', line 222

def increment_path_length(length)
  @db.execute(<<~SQL, length)
    INSERT INTO path_length_counts (length, count) VALUES (?, 1)
    ON CONFLICT(length) DO UPDATE SET count = count + 1
  SQL
end

#increment_raw_shape(shape) ⇒ Object



229
230
231
# File 'lib/iriq/storage/sqlite.rb', line 229

def increment_raw_shape(shape)
  upsert_shape("raw_shape_counts", shape)
end

#observe_position(position, value, type) ⇒ Object



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/iriq/storage/sqlite.rb', line 237

def observe_position(position, value, type)
  host    = position.host || ""
  scope   = position.scope.to_s
  locator = position.locator
  @db.execute(<<~SQL, [host, scope, locator])
    INSERT INTO position_stats (host, scope, locator, total) VALUES (?, ?, ?, 1)
    ON CONFLICT(host, scope, locator) DO UPDATE SET total = total + 1
  SQL

  # Type counts are unbounded — always upsert.
  @db.execute(<<~SQL, [host, scope, locator, type.to_s])
    INSERT INTO position_types (host, scope, locator, type, count) VALUES (?, ?, ?, ?, 1)
    ON CONFLICT(host, scope, locator, type) DO UPDATE SET count = count + 1
  SQL

  # Value counts are capped at max_values_per_position. If the value
  # already exists, increment it; otherwise insert only when
  # cardinality is below the cap. Two-step rather than ON CONFLICT
  # because we need to enforce the cap on insert.
  @db.execute(<<~SQL, [host, scope, locator, value])
    UPDATE position_values SET count = count + 1
    WHERE host = ? AND scope = ? AND locator = ? AND value = ?
  SQL
  if @db.changes.zero?
    card = @db.get_first_value(
      "SELECT COUNT(*) FROM position_values WHERE host = ? AND scope = ? AND locator = ?",
      [host, scope, locator],
    )
    if card < @max_values_per_position
      @db.execute(
        "INSERT INTO position_values (host, scope, locator, value, count) VALUES (?, ?, ?, ?, 1)",
        [host, scope, locator, value],
      )
    end
  end
end

#observed_iri_countObject



352
353
354
# File 'lib/iriq/storage/sqlite.rb', line 352

def observed_iri_count
  @db.get_first_value("SELECT COUNT(*) FROM observed_iris") || 0
end

#path_length_countsObject



401
402
403
404
405
# File 'lib/iriq/storage/sqlite.rb', line 401

def path_length_counts
  h = Hash.new(0)
  @db.execute("SELECT length, count FROM path_length_counts") { |r| h[r[0]] = r[1] }
  h
end

#position_stats(position) ⇒ Object



415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
# File 'lib/iriq/storage/sqlite.rb', line 415

def position_stats(position)
  host    = position.host || ""
  scope   = position.scope.to_s
  locator = position.locator
  total = @db.get_first_value(
    "SELECT total FROM position_stats WHERE host = ? AND scope = ? AND locator = ?",
    [host, scope, locator],
  )
  return nil if total.nil?

  stats = PositionStats.new(max_values: @max_values_per_position)
  stats.instance_variable_set(:@total, total)

  vc = Hash.new(0)
  @db.execute(
    "SELECT value, count FROM position_values WHERE host = ? AND scope = ? AND locator = ?",
    [host, scope, locator],
  ) { |r| vc[r[0]] = r[1] }
  stats.instance_variable_set(:@value_counts, vc)

  tc = Hash.new(0)
  @db.execute(
    "SELECT type, count FROM position_types WHERE host = ? AND scope = ? AND locator = ?",
    [host, scope, locator],
  ) { |r| tc[r[0].to_sym] = r[1] }
  stats.instance_variable_set(:@type_counts, tc)

  stats
end

#raw_shape_countsObject



407
408
409
# File 'lib/iriq/storage/sqlite.rb', line 407

def raw_shape_counts
  rows_to_count_hash("raw_shape_counts", "shape")
end

#record_activated_recognizer(dump) ⇒ Object

— Activated recognizers ——————————————–



358
359
360
361
362
363
# File 'lib/iriq/storage/sqlite.rb', line 358

def record_activated_recognizer(dump)
  @db.execute(<<~SQL, [dump["prefix"], dump["type"], dump.fetch("specificity", 1.0)])
    INSERT INTO activated_recognizers (prefix, type, specificity) VALUES (?, ?, ?)
    ON CONFLICT(prefix) DO UPDATE SET type = excluded.type, specificity = excluded.specificity
  SQL
end

#record_observation(canonical) ⇒ Object

Append a canonical IRI to the source-IRI log. Inside the same transaction as the event reducers, so the log and views stay consistent.



342
343
344
# File 'lib/iriq/storage/sqlite.rb', line 342

def record_observation(canonical)
  @db.execute("INSERT INTO observed_iris (canonical) VALUES (?)", [canonical])
end

#save(_path = nil) ⇒ Object



200
201
202
# File 'lib/iriq/storage/sqlite.rb', line 200

def save(_path = nil)
  # Already persisted. Provided for parity with the JSON backend.
end

#setup!Object



150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/iriq/storage/sqlite.rb', line 150

def setup!
  @db.execute_batch(SCHEMA)
  existing = @db.get_first_value("SELECT value FROM meta WHERE key = 'schema_version'")
  if existing.nil?
    @db.execute("INSERT INTO meta (key, value) VALUES ('schema_version', ?)", SCHEMA_VERSION.to_s)
    @db.execute("INSERT INTO meta (key, value) VALUES ('max_values_per_position', ?)",
                @max_values_per_position.to_s)
  else
    @max_values_per_position = (@db.get_first_value(
      "SELECT value FROM meta WHERE key = 'max_values_per_position'"
    ) || @max_values_per_position).to_i
  end
  self
end

#transactionObject



165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/iriq/storage/sqlite.rb', line 165

def transaction
  # While inside an outer batch, observe()-time transactions become
  # no-ops — the outer batch wraps everything in one txn for speed.
  return yield(self) if @in_batch

  @db.transaction
  yield self
  @db.commit
rescue
  @db.rollback rescue nil
  raise
end