Class: Iriq::Storage::Sqlite
- Inherits:
-
Object
- Object
- Iriq::Storage::Sqlite
- Defined in:
- lib/iriq/storage/sqlite.rb
Overview
Sqlite is the incremental-write backend. Each observation translates to a handful of UPSERTs against a long-lived connection; nothing is materialized in memory beyond what reads explicitly ask for.
WAL journaling lets multiple processes observe against the same file concurrently — the writer is serialized, readers are not blocked, and the existing ‘iriq –corpus c.db <url>` pattern works without a flock at the application layer.
Constant Summary collapse
- SCHEMA_VERSION =
4- SCHEMA =
<<~SQL.freeze CREATE TABLE IF NOT EXISTS meta ( key TEXT PRIMARY KEY, value TEXT ); CREATE TABLE IF NOT EXISTS host_counts ( host TEXT PRIMARY KEY, count INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS path_length_counts ( length INTEGER PRIMARY KEY, count INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS raw_shape_counts ( shape TEXT PRIMARY KEY, count INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS fingerprint_counts ( shape TEXT PRIMARY KEY, count INTEGER NOT NULL ); -- Position is (host, scope, locator). For scope='path' the locator -- is the typed prefix; for scope='query' it's the param name. -- Today only 'path' is observed here (query params live on the -- cluster_* tables) — scope is in the schema so future commits -- can fold query positions in without another migration. CREATE TABLE IF NOT EXISTS position_stats ( host TEXT NOT NULL, scope TEXT NOT NULL, locator TEXT NOT NULL, total INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (host, scope, locator) ); CREATE TABLE IF NOT EXISTS position_values ( host TEXT NOT NULL, scope TEXT NOT NULL, locator TEXT NOT NULL, value TEXT NOT NULL, count INTEGER NOT NULL, PRIMARY KEY (host, scope, locator, value) ); CREATE TABLE IF NOT EXISTS position_types ( host TEXT NOT NULL, scope TEXT NOT NULL, locator TEXT NOT NULL, type TEXT NOT NULL, count INTEGER NOT NULL, PRIMARY KEY (host, scope, locator, type) ); CREATE TABLE IF NOT EXISTS clusters ( key TEXT PRIMARY KEY, host TEXT, scheme TEXT, shape TEXT, count INTEGER NOT NULL DEFAULT 0, ord INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS cluster_examples ( cluster_key TEXT NOT NULL, position INTEGER NOT NULL, canonical TEXT NOT NULL, PRIMARY KEY (cluster_key, position) ); CREATE TABLE IF NOT EXISTS cluster_segments ( cluster_key TEXT NOT NULL, position INTEGER NOT NULL, value TEXT NOT NULL, count INTEGER NOT NULL, PRIMARY KEY (cluster_key, position, value) ); CREATE TABLE IF NOT EXISTS cluster_params ( cluster_key TEXT NOT NULL, name TEXT NOT NULL, total INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (cluster_key, name) ); CREATE TABLE IF NOT EXISTS cluster_param_values ( cluster_key TEXT NOT NULL, name TEXT NOT NULL, value TEXT NOT NULL, count INTEGER NOT NULL, PRIMARY KEY (cluster_key, name, value) ); CREATE TABLE IF NOT EXISTS cluster_param_types ( cluster_key TEXT NOT NULL, name TEXT NOT NULL, type TEXT NOT NULL, count INTEGER NOT NULL, PRIMARY KEY (cluster_key, name, type) ); -- Source-IRI log. The materialized views above are derived from -- this log via events + reducers. Corpus#reinfer drops the views -- and replays the log to rebuild them. id is monotonic so -- iteration order is observation order. CREATE TABLE IF NOT EXISTS observed_iris ( id INTEGER PRIMARY KEY AUTOINCREMENT, canonical TEXT NOT NULL ); -- Recognizers promoted from RecognizerProposal via -- Corpus#activate_proposal. Re-applied to the corpus's -- classifier on Corpus.open so a reopen picks up its learned -- patterns. Keyed by prefix; activating the same prefix twice -- is a no-op. CREATE TABLE IF NOT EXISTS activated_recognizers ( prefix TEXT PRIMARY KEY, type TEXT NOT NULL, specificity REAL NOT NULL DEFAULT 1.0 ); SQL
Instance Attribute Summary collapse
-
#max_values_per_position ⇒ Object
readonly
Returns the value of attribute max_values_per_position.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
Class Method Summary collapse
Instance Method Summary collapse
- #activated_recognizer_count ⇒ Object
- #add_to_cluster(key, host, scheme, shape, identifier) ⇒ Object
-
#batch ⇒ Object
Wrap many observations in a single transaction.
-
#clear_materialized_views ⇒ Object
Drop every materialized view without touching the source-IRI log.
- #close ⇒ Object
- #cluster_for(key) ⇒ Object
- #cluster_size ⇒ Object
- #clusters ⇒ Object
- #each_activated_recognizer ⇒ Object
- #each_observed_iri ⇒ Object
- #each_position_stats ⇒ Object
- #fingerprint_counts ⇒ Object
-
#flush ⇒ Object
Saving is automatic — incremental UPSERTs hit disk on commit.
-
#host_counts ⇒ Object
— Reads ————————————————————.
- #increment_fingerprint(shape) ⇒ Object
-
#increment_host(host) ⇒ Object
— Increments ——————————————————-.
- #increment_path_length(length) ⇒ Object
- #increment_raw_shape(shape) ⇒ Object
-
#initialize(path:, classifier: SegmentClassifier::DEFAULT, max_values_per_position: PositionStats::DEFAULT_MAX_VALUES) ⇒ Sqlite
constructor
A new instance of Sqlite.
- #observe_position(position, value, type) ⇒ Object
- #observed_iri_count ⇒ Object
- #path_length_counts ⇒ Object
- #position_stats(position) ⇒ Object
- #raw_shape_counts ⇒ Object
-
#record_activated_recognizer(dump) ⇒ Object
— Activated recognizers ——————————————–.
-
#record_observation(canonical) ⇒ Object
Append a canonical IRI to the source-IRI log.
- #save(_path = nil) ⇒ Object
- #setup! ⇒ Object
- #transaction ⇒ Object
Constructor Details
#initialize(path:, classifier: SegmentClassifier::DEFAULT, max_values_per_position: PositionStats::DEFAULT_MAX_VALUES) ⇒ Sqlite
Returns a new instance of Sqlite.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/iriq/storage/sqlite.rb', line 133 def initialize(path:, classifier: SegmentClassifier::DEFAULT, max_values_per_position: PositionStats::DEFAULT_MAX_VALUES) @path = path @classifier = classifier @max_values_per_position = max_values_per_position @db = SQLite3::Database.new(path) # busy_timeout MUST come first: other PRAGMAs (journal_mode in # particular) can themselves block on the write lock under # concurrent open, and without busy_timeout set they fail # immediately with SQLITE_BUSY. @db.execute("PRAGMA busy_timeout = 30000") @db.execute("PRAGMA journal_mode = WAL") @db.execute("PRAGMA synchronous = NORMAL") @db.execute("PRAGMA foreign_keys = ON") @in_batch = false end |
Instance Attribute Details
#max_values_per_position ⇒ Object (readonly)
Returns the value of attribute max_values_per_position.
126 127 128 |
# File 'lib/iriq/storage/sqlite.rb', line 126 def max_values_per_position @max_values_per_position end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
126 127 128 |
# File 'lib/iriq/storage/sqlite.rb', line 126 def path @path end |
Class Method Details
.open(path, classifier: SegmentClassifier::DEFAULT, max_values_per_position: PositionStats::DEFAULT_MAX_VALUES) ⇒ Object
128 129 130 131 |
# File 'lib/iriq/storage/sqlite.rb', line 128 def self.open(path, classifier: SegmentClassifier::DEFAULT, max_values_per_position: PositionStats::DEFAULT_MAX_VALUES) new(path: path, classifier: classifier, max_values_per_position: max_values_per_position).tap(&:setup!) end |
Instance Method Details
#activated_recognizer_count ⇒ Object
371 372 373 |
# File 'lib/iriq/storage/sqlite.rb', line 371 def activated_recognizer_count @db.get_first_value("SELECT COUNT(*) FROM activated_recognizers") || 0 end |
#add_to_cluster(key, host, scheme, shape, identifier) ⇒ Object
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/iriq/storage/sqlite.rb', line 274 def add_to_cluster(key, host, scheme, shape, identifier) # Insert the cluster row if new (with a monotonic ord for stable # iteration), then bump its count. @db.execute(<<~SQL, [key, host, scheme, shape]) INSERT INTO clusters (key, host, scheme, shape, count, ord) VALUES (?, ?, ?, ?, 1, (SELECT COALESCE(MAX(ord), 0) + 1 FROM clusters)) ON CONFLICT(key) DO UPDATE SET count = count + 1 SQL # Examples — capped at Cluster::MAX_EXAMPLES. examples_count = @db.get_first_value( "SELECT COUNT(*) FROM cluster_examples WHERE cluster_key = ?", [key], ) if examples_count < Cluster::MAX_EXAMPLES @db.execute(<<~SQL, [key, examples_count, identifier.canonical]) INSERT INTO cluster_examples (cluster_key, position, canonical) VALUES (?, ?, ?) SQL end # Per-position segment counts — uncapped. identifier.path_segments.each_with_index do |seg, i| @db.execute(<<~SQL, [key, i, seg]) INSERT INTO cluster_segments (cluster_key, position, value, count) VALUES (?, ?, ?, 1) ON CONFLICT(cluster_key, position, value) DO UPDATE SET count = count + 1 SQL end # Per-param stats (presence + value cardinality + type) — mirrors the # in-memory Cluster#add path. Value table respects the same per-key # cap as position_values. (identifier.query_params || {}).each do |name, value| v = value.to_s type = @classifier.classify(v).to_s @db.execute(<<~SQL, [key, name]) INSERT INTO cluster_params (cluster_key, name, total) VALUES (?, ?, 1) ON CONFLICT(cluster_key, name) DO UPDATE SET total = total + 1 SQL @db.execute(<<~SQL, [key, name, type]) INSERT INTO cluster_param_types (cluster_key, name, type, count) VALUES (?, ?, ?, 1) ON CONFLICT(cluster_key, name, type) DO UPDATE SET count = count + 1 SQL @db.execute(<<~SQL, [key, name, v]) UPDATE cluster_param_values SET count = count + 1 WHERE cluster_key = ? AND name = ? AND value = ? SQL if @db.changes.zero? card = @db.get_first_value( "SELECT COUNT(*) FROM cluster_param_values WHERE cluster_key = ? AND name = ?", [key, name], ) if card < @max_values_per_position @db.execute( "INSERT INTO cluster_param_values (cluster_key, name, value, count) VALUES (?, ?, ?, 1)", [key, name, v], ) end end end load_cluster(key) end |
#batch ⇒ Object
Wrap many observations in a single transaction. Cuts SQLite write overhead from O(observations) fsyncs to O(1).
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/iriq/storage/sqlite.rb', line 180 def batch return yield if @in_batch @in_batch = true @db.transaction begin yield @db.commit rescue @db.rollback rescue nil raise ensure @in_batch = false end end |
#clear_materialized_views ⇒ Object
Drop every materialized view without touching the source-IRI log. Corpus#reinfer calls this before replaying the log.
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 |
# File 'lib/iriq/storage/sqlite.rb', line 377 def clear_materialized_views @db.execute_batch(<<~SQL) DELETE FROM host_counts; DELETE FROM path_length_counts; DELETE FROM raw_shape_counts; DELETE FROM fingerprint_counts; DELETE FROM position_stats; DELETE FROM position_values; DELETE FROM position_types; DELETE FROM clusters; DELETE FROM cluster_examples; DELETE FROM cluster_segments; DELETE FROM cluster_params; DELETE FROM cluster_param_values; DELETE FROM cluster_param_types; SQL end |
#close ⇒ Object
204 205 206 207 208 209 |
# File 'lib/iriq/storage/sqlite.rb', line 204 def close # Checkpoint + truncate the WAL so the .db-wal sidecar doesn't grow # unbounded across long-lived `iriq --corpus c.db` sessions. @db.execute("PRAGMA wal_checkpoint(TRUNCATE)") rescue nil @db.close end |
#cluster_for(key) ⇒ Object
468 469 470 |
# File 'lib/iriq/storage/sqlite.rb', line 468 def cluster_for(key) load_cluster(key) end |
#cluster_size ⇒ Object
464 465 466 |
# File 'lib/iriq/storage/sqlite.rb', line 464 def cluster_size @db.get_first_value("SELECT COUNT(*) FROM clusters") end |
#clusters ⇒ Object
456 457 458 459 460 461 462 |
# File 'lib/iriq/storage/sqlite.rb', line 456 def clusters out = [] @db.execute("SELECT key FROM clusters ORDER BY ord") do |row| out << load_cluster(row[0]) end out end |
#each_activated_recognizer ⇒ Object
365 366 367 368 369 |
# File 'lib/iriq/storage/sqlite.rb', line 365 def each_activated_recognizer @db.execute("SELECT prefix, type, specificity FROM activated_recognizers ORDER BY prefix") do |row| yield({ "prefix" => row[0], "type" => row[1], "specificity" => row[2] }) end end |
#each_observed_iri ⇒ Object
346 347 348 349 350 |
# File 'lib/iriq/storage/sqlite.rb', line 346 def each_observed_iri @db.execute("SELECT canonical FROM observed_iris ORDER BY id") do |row| yield row[0] end end |
#each_position_stats ⇒ Object
445 446 447 448 449 450 451 452 453 454 |
# File 'lib/iriq/storage/sqlite.rb', line 445 def each_position_stats seen = [] @db.execute("SELECT DISTINCT host, scope, locator FROM position_stats ORDER BY ROWID") do |row| seen << row end seen.each do |host, scope, locator| pos = Position.new(host: host, scope: scope.to_sym, locator: locator) yield pos, position_stats(pos) end end |
#fingerprint_counts ⇒ Object
411 412 413 |
# File 'lib/iriq/storage/sqlite.rb', line 411 def fingerprint_counts rows_to_count_hash("fingerprint_counts", "shape") end |
#flush ⇒ Object
Saving is automatic — incremental UPSERTs hit disk on commit. flush makes that explicit; close releases the connection.
198 |
# File 'lib/iriq/storage/sqlite.rb', line 198 def flush; end |
#host_counts ⇒ Object
— Reads ————————————————————
397 398 399 |
# File 'lib/iriq/storage/sqlite.rb', line 397 def host_counts rows_to_count_hash("host_counts", "host") end |
#increment_fingerprint(shape) ⇒ Object
233 234 235 |
# File 'lib/iriq/storage/sqlite.rb', line 233 def increment_fingerprint(shape) upsert_shape("fingerprint_counts", shape) end |
#increment_host(host) ⇒ Object
— Increments ——————————————————-
213 214 215 216 217 218 219 220 |
# File 'lib/iriq/storage/sqlite.rb', line 213 def increment_host(host) return unless host @db.execute(<<~SQL, host) INSERT INTO host_counts (host, count) VALUES (?, 1) ON CONFLICT(host) DO UPDATE SET count = count + 1 SQL end |
#increment_path_length(length) ⇒ Object
222 223 224 225 226 227 |
# File 'lib/iriq/storage/sqlite.rb', line 222 def increment_path_length(length) @db.execute(<<~SQL, length) INSERT INTO path_length_counts (length, count) VALUES (?, 1) ON CONFLICT(length) DO UPDATE SET count = count + 1 SQL end |
#increment_raw_shape(shape) ⇒ Object
229 230 231 |
# File 'lib/iriq/storage/sqlite.rb', line 229 def increment_raw_shape(shape) upsert_shape("raw_shape_counts", shape) end |
#observe_position(position, value, type) ⇒ Object
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/iriq/storage/sqlite.rb', line 237 def observe_position(position, value, type) host = position.host || "" scope = position.scope.to_s locator = position.locator @db.execute(<<~SQL, [host, scope, locator]) INSERT INTO position_stats (host, scope, locator, total) VALUES (?, ?, ?, 1) ON CONFLICT(host, scope, locator) DO UPDATE SET total = total + 1 SQL # Type counts are unbounded — always upsert. @db.execute(<<~SQL, [host, scope, locator, type.to_s]) INSERT INTO position_types (host, scope, locator, type, count) VALUES (?, ?, ?, ?, 1) ON CONFLICT(host, scope, locator, type) DO UPDATE SET count = count + 1 SQL # Value counts are capped at max_values_per_position. If the value # already exists, increment it; otherwise insert only when # cardinality is below the cap. Two-step rather than ON CONFLICT # because we need to enforce the cap on insert. @db.execute(<<~SQL, [host, scope, locator, value]) UPDATE position_values SET count = count + 1 WHERE host = ? AND scope = ? AND locator = ? AND value = ? SQL if @db.changes.zero? card = @db.get_first_value( "SELECT COUNT(*) FROM position_values WHERE host = ? AND scope = ? AND locator = ?", [host, scope, locator], ) if card < @max_values_per_position @db.execute( "INSERT INTO position_values (host, scope, locator, value, count) VALUES (?, ?, ?, ?, 1)", [host, scope, locator, value], ) end end end |
#observed_iri_count ⇒ Object
352 353 354 |
# File 'lib/iriq/storage/sqlite.rb', line 352 def observed_iri_count @db.get_first_value("SELECT COUNT(*) FROM observed_iris") || 0 end |
#path_length_counts ⇒ Object
401 402 403 404 405 |
# File 'lib/iriq/storage/sqlite.rb', line 401 def path_length_counts h = Hash.new(0) @db.execute("SELECT length, count FROM path_length_counts") { |r| h[r[0]] = r[1] } h end |
#position_stats(position) ⇒ Object
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 |
# File 'lib/iriq/storage/sqlite.rb', line 415 def position_stats(position) host = position.host || "" scope = position.scope.to_s locator = position.locator total = @db.get_first_value( "SELECT total FROM position_stats WHERE host = ? AND scope = ? AND locator = ?", [host, scope, locator], ) return nil if total.nil? stats = PositionStats.new(max_values: @max_values_per_position) stats.instance_variable_set(:@total, total) vc = Hash.new(0) @db.execute( "SELECT value, count FROM position_values WHERE host = ? AND scope = ? AND locator = ?", [host, scope, locator], ) { |r| vc[r[0]] = r[1] } stats.instance_variable_set(:@value_counts, vc) tc = Hash.new(0) @db.execute( "SELECT type, count FROM position_types WHERE host = ? AND scope = ? AND locator = ?", [host, scope, locator], ) { |r| tc[r[0].to_sym] = r[1] } stats.instance_variable_set(:@type_counts, tc) stats end |
#raw_shape_counts ⇒ Object
407 408 409 |
# File 'lib/iriq/storage/sqlite.rb', line 407 def raw_shape_counts rows_to_count_hash("raw_shape_counts", "shape") end |
#record_activated_recognizer(dump) ⇒ Object
— Activated recognizers ——————————————–
358 359 360 361 362 363 |
# File 'lib/iriq/storage/sqlite.rb', line 358 def record_activated_recognizer(dump) @db.execute(<<~SQL, [dump["prefix"], dump["type"], dump.fetch("specificity", 1.0)]) INSERT INTO activated_recognizers (prefix, type, specificity) VALUES (?, ?, ?) ON CONFLICT(prefix) DO UPDATE SET type = excluded.type, specificity = excluded.specificity SQL end |
#record_observation(canonical) ⇒ Object
Append a canonical IRI to the source-IRI log. Inside the same transaction as the event reducers, so the log and views stay consistent.
342 343 344 |
# File 'lib/iriq/storage/sqlite.rb', line 342 def record_observation(canonical) @db.execute("INSERT INTO observed_iris (canonical) VALUES (?)", [canonical]) end |
#save(_path = nil) ⇒ Object
200 201 202 |
# File 'lib/iriq/storage/sqlite.rb', line 200 def save(_path = nil) # Already persisted. Provided for parity with the JSON backend. end |
#setup! ⇒ Object
150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/iriq/storage/sqlite.rb', line 150 def setup! @db.execute_batch(SCHEMA) existing = @db.get_first_value("SELECT value FROM meta WHERE key = 'schema_version'") if existing.nil? @db.execute("INSERT INTO meta (key, value) VALUES ('schema_version', ?)", SCHEMA_VERSION.to_s) @db.execute("INSERT INTO meta (key, value) VALUES ('max_values_per_position', ?)", @max_values_per_position.to_s) else @max_values_per_position = (@db.get_first_value( "SELECT value FROM meta WHERE key = 'max_values_per_position'" ) || @max_values_per_position).to_i end self end |
#transaction ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/iriq/storage/sqlite.rb', line 165 def transaction # While inside an outer batch, observe()-time transactions become # no-ops — the outer batch wraps everything in one txn for speed. return yield(self) if @in_batch @db.transaction yield self @db.commit rescue @db.rollback rescue nil raise end |