Class: Iriq::Storage::Sqlite
- Inherits:
-
Object
- Object
- Iriq::Storage::Sqlite
- Defined in:
- lib/iriq/storage/sqlite.rb
Overview
Sqlite is the incremental-write backend. Each observation translates to a handful of UPSERTs against a long-lived connection; nothing is materialized in memory beyond what reads explicitly ask for.
WAL journaling lets multiple processes observe against the same file concurrently — the writer is serialized, readers are not blocked, and the existing ‘iriq –corpus c.db <url>` pattern works without a flock at the application layer.
Constant Summary collapse
- SCHEMA_VERSION =
1- SCHEMA =
<<~SQL.freeze CREATE TABLE IF NOT EXISTS meta ( key TEXT PRIMARY KEY, value TEXT ); CREATE TABLE IF NOT EXISTS host_counts ( host TEXT PRIMARY KEY, count INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS path_length_counts ( length INTEGER PRIMARY KEY, count INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS raw_shape_counts ( shape TEXT PRIMARY KEY, count INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS fingerprint_counts ( shape TEXT PRIMARY KEY, count INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS position_stats ( host TEXT NOT NULL, prefix TEXT NOT NULL, total INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (host, prefix) ); CREATE TABLE IF NOT EXISTS position_values ( host TEXT NOT NULL, prefix TEXT NOT NULL, value TEXT NOT NULL, count INTEGER NOT NULL, PRIMARY KEY (host, prefix, value) ); CREATE TABLE IF NOT EXISTS position_types ( host TEXT NOT NULL, prefix TEXT NOT NULL, type TEXT NOT NULL, count INTEGER NOT NULL, PRIMARY KEY (host, prefix, type) ); CREATE TABLE IF NOT EXISTS clusters ( key TEXT PRIMARY KEY, host TEXT, scheme TEXT, shape TEXT, count INTEGER NOT NULL DEFAULT 0, ord INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS cluster_examples ( cluster_key TEXT NOT NULL, position INTEGER NOT NULL, canonical TEXT NOT NULL, PRIMARY KEY (cluster_key, position) ); CREATE TABLE IF NOT EXISTS cluster_segments ( cluster_key TEXT NOT NULL, position INTEGER NOT NULL, value TEXT NOT NULL, count INTEGER NOT NULL, PRIMARY KEY (cluster_key, position, value) ); SQL
Instance Attribute Summary collapse
-
#max_values_per_position ⇒ Object
readonly
Returns the value of attribute max_values_per_position.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
Class Method Summary collapse
Instance Method Summary collapse
- #add_to_cluster(key, host, scheme, shape, identifier) ⇒ Object
-
#batch ⇒ Object
Wrap many observations in a single transaction.
- #close ⇒ Object
- #cluster_size ⇒ Object
- #clusters ⇒ Object
- #each_position_stats ⇒ Object
- #fingerprint_counts ⇒ Object
-
#flush ⇒ Object
Saving is automatic — incremental UPSERTs hit disk on commit.
-
#host_counts ⇒ Object
— Reads ————————————————————.
- #increment_fingerprint(shape) ⇒ Object
-
#increment_host(host) ⇒ Object
— Increments ——————————————————-.
- #increment_path_length(length) ⇒ Object
- #increment_raw_shape(shape) ⇒ Object
-
#initialize(path:, classifier: SegmentClassifier::DEFAULT, max_values_per_position: PositionStats::DEFAULT_MAX_VALUES) ⇒ Sqlite
constructor
A new instance of Sqlite.
- #observe_position(host, prefix, value, type) ⇒ Object
- #path_length_counts ⇒ Object
- #position_stats(host, prefix) ⇒ Object
- #raw_shape_counts ⇒ Object
- #save(_path = nil) ⇒ Object
- #setup! ⇒ Object
- #transaction ⇒ Object
Constructor Details
#initialize(path:, classifier: SegmentClassifier::DEFAULT, max_values_per_position: PositionStats::DEFAULT_MAX_VALUES) ⇒ Sqlite
Returns a new instance of Sqlite.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/iriq/storage/sqlite.rb', line 87 def initialize(path:, classifier: SegmentClassifier::DEFAULT, max_values_per_position: PositionStats::DEFAULT_MAX_VALUES) @path = path @classifier = classifier @max_values_per_position = max_values_per_position @db = SQLite3::Database.new(path) # busy_timeout MUST come first: other PRAGMAs (journal_mode in # particular) can themselves block on the write lock under # concurrent open, and without busy_timeout set they fail # immediately with SQLITE_BUSY. @db.execute("PRAGMA busy_timeout = 30000") @db.execute("PRAGMA journal_mode = WAL") @db.execute("PRAGMA synchronous = NORMAL") @db.execute("PRAGMA foreign_keys = ON") @in_batch = false end |
Instance Attribute Details
#max_values_per_position ⇒ Object (readonly)
Returns the value of attribute max_values_per_position.
80 81 82 |
# File 'lib/iriq/storage/sqlite.rb', line 80 def max_values_per_position @max_values_per_position end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
80 81 82 |
# File 'lib/iriq/storage/sqlite.rb', line 80 def path @path end |
Class Method Details
.open(path, classifier: SegmentClassifier::DEFAULT, max_values_per_position: PositionStats::DEFAULT_MAX_VALUES) ⇒ Object
82 83 84 85 |
# File 'lib/iriq/storage/sqlite.rb', line 82 def self.open(path, classifier: SegmentClassifier::DEFAULT, max_values_per_position: PositionStats::DEFAULT_MAX_VALUES) new(path: path, classifier: classifier, max_values_per_position: max_values_per_position).tap(&:setup!) end |
Instance Method Details
#add_to_cluster(key, host, scheme, shape, identifier) ⇒ Object
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/iriq/storage/sqlite.rb', line 226 def add_to_cluster(key, host, scheme, shape, identifier) # Insert the cluster row if new (with a monotonic ord for stable # iteration), then bump its count. @db.execute(<<~SQL, [key, host, scheme, shape]) INSERT INTO clusters (key, host, scheme, shape, count, ord) VALUES (?, ?, ?, ?, 1, (SELECT COALESCE(MAX(ord), 0) + 1 FROM clusters)) ON CONFLICT(key) DO UPDATE SET count = count + 1 SQL # Examples — capped at Cluster::MAX_EXAMPLES. examples_count = @db.get_first_value( "SELECT COUNT(*) FROM cluster_examples WHERE cluster_key = ?", [key], ) if examples_count < Cluster::MAX_EXAMPLES @db.execute(<<~SQL, [key, examples_count, identifier.canonical]) INSERT INTO cluster_examples (cluster_key, position, canonical) VALUES (?, ?, ?) SQL end # Per-position segment counts — uncapped. identifier.path_segments.each_with_index do |seg, i| @db.execute(<<~SQL, [key, i, seg]) INSERT INTO cluster_segments (cluster_key, position, value, count) VALUES (?, ?, ?, 1) ON CONFLICT(cluster_key, position, value) DO UPDATE SET count = count + 1 SQL end load_cluster(key) end |
#batch ⇒ Object
Wrap many observations in a single transaction. Cuts SQLite write overhead from O(observations) fsyncs to O(1).
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/iriq/storage/sqlite.rb', line 134 def batch return yield if @in_batch @in_batch = true @db.transaction begin yield @db.commit rescue @db.rollback rescue nil raise ensure @in_batch = false end end |
#close ⇒ Object
158 159 160 161 162 163 |
# File 'lib/iriq/storage/sqlite.rb', line 158 def close # Checkpoint + truncate the WAL so the .db-wal sidecar doesn't grow # unbounded across long-lived `iriq --corpus c.db` sessions. @db.execute("PRAGMA wal_checkpoint(TRUNCATE)") rescue nil @db.close end |
#cluster_size ⇒ Object
318 319 320 |
# File 'lib/iriq/storage/sqlite.rb', line 318 def cluster_size @db.get_first_value("SELECT COUNT(*) FROM clusters") end |
#clusters ⇒ Object
310 311 312 313 314 315 316 |
# File 'lib/iriq/storage/sqlite.rb', line 310 def clusters out = [] @db.execute("SELECT key FROM clusters ORDER BY ord") do |row| out << load_cluster(row[0]) end out end |
#each_position_stats ⇒ Object
302 303 304 305 306 307 308 |
# File 'lib/iriq/storage/sqlite.rb', line 302 def each_position_stats seen = [] @db.execute("SELECT DISTINCT host, prefix FROM position_stats ORDER BY ROWID") do |row| seen << row end seen.each { |host, prefix| yield [host, prefix], position_stats(host, prefix) } end |
#fingerprint_counts ⇒ Object
273 274 275 |
# File 'lib/iriq/storage/sqlite.rb', line 273 def fingerprint_counts rows_to_count_hash("fingerprint_counts", "shape") end |
#flush ⇒ Object
Saving is automatic — incremental UPSERTs hit disk on commit. flush makes that explicit; close releases the connection.
152 |
# File 'lib/iriq/storage/sqlite.rb', line 152 def flush; end |
#host_counts ⇒ Object
— Reads ————————————————————
259 260 261 |
# File 'lib/iriq/storage/sqlite.rb', line 259 def host_counts rows_to_count_hash("host_counts", "host") end |
#increment_fingerprint(shape) ⇒ Object
187 188 189 |
# File 'lib/iriq/storage/sqlite.rb', line 187 def increment_fingerprint(shape) upsert_shape("fingerprint_counts", shape) end |
#increment_host(host) ⇒ Object
— Increments ——————————————————-
167 168 169 170 171 172 173 174 |
# File 'lib/iriq/storage/sqlite.rb', line 167 def increment_host(host) return unless host @db.execute(<<~SQL, host) INSERT INTO host_counts (host, count) VALUES (?, 1) ON CONFLICT(host) DO UPDATE SET count = count + 1 SQL end |
#increment_path_length(length) ⇒ Object
176 177 178 179 180 181 |
# File 'lib/iriq/storage/sqlite.rb', line 176 def increment_path_length(length) @db.execute(<<~SQL, length) INSERT INTO path_length_counts (length, count) VALUES (?, 1) ON CONFLICT(length) DO UPDATE SET count = count + 1 SQL end |
#increment_raw_shape(shape) ⇒ Object
183 184 185 |
# File 'lib/iriq/storage/sqlite.rb', line 183 def increment_raw_shape(shape) upsert_shape("raw_shape_counts", shape) end |
#observe_position(host, prefix, value, type) ⇒ Object
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/iriq/storage/sqlite.rb', line 191 def observe_position(host, prefix, value, type) host ||= "" @db.execute(<<~SQL, [host, prefix]) INSERT INTO position_stats (host, prefix, total) VALUES (?, ?, 1) ON CONFLICT(host, prefix) DO UPDATE SET total = total + 1 SQL # Type counts are unbounded — always upsert. @db.execute(<<~SQL, [host, prefix, type.to_s]) INSERT INTO position_types (host, prefix, type, count) VALUES (?, ?, ?, 1) ON CONFLICT(host, prefix, type) DO UPDATE SET count = count + 1 SQL # Value counts are capped at max_values_per_position. If the value # already exists, increment it; otherwise insert only when # cardinality is below the cap. Two-step rather than ON CONFLICT # because we need to enforce the cap on insert. @db.execute(<<~SQL, [host, prefix, value]) UPDATE position_values SET count = count + 1 WHERE host = ? AND prefix = ? AND value = ? SQL if @db.changes.zero? card = @db.get_first_value( "SELECT COUNT(*) FROM position_values WHERE host = ? AND prefix = ?", [host, prefix], ) if card < @max_values_per_position @db.execute( "INSERT INTO position_values (host, prefix, value, count) VALUES (?, ?, ?, 1)", [host, prefix, value], ) end end end |
#path_length_counts ⇒ Object
263 264 265 266 267 |
# File 'lib/iriq/storage/sqlite.rb', line 263 def path_length_counts h = Hash.new(0) @db.execute("SELECT length, count FROM path_length_counts") { |r| h[r[0]] = r[1] } h end |
#position_stats(host, prefix) ⇒ Object
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/iriq/storage/sqlite.rb', line 277 def position_stats(host, prefix) host ||= "" total = @db.get_first_value( "SELECT total FROM position_stats WHERE host = ? AND prefix = ?", [host, prefix], ) return nil if total.nil? stats = PositionStats.new(max_values: @max_values_per_position) stats.instance_variable_set(:@total, total) vc = Hash.new(0) @db.execute( "SELECT value, count FROM position_values WHERE host = ? AND prefix = ?", [host, prefix] ) { |r| vc[r[0]] = r[1] } stats.instance_variable_set(:@value_counts, vc) tc = Hash.new(0) @db.execute( "SELECT type, count FROM position_types WHERE host = ? AND prefix = ?", [host, prefix] ) { |r| tc[r[0].to_sym] = r[1] } stats.instance_variable_set(:@type_counts, tc) stats end |
#raw_shape_counts ⇒ Object
269 270 271 |
# File 'lib/iriq/storage/sqlite.rb', line 269 def raw_shape_counts rows_to_count_hash("raw_shape_counts", "shape") end |
#save(_path = nil) ⇒ Object
154 155 156 |
# File 'lib/iriq/storage/sqlite.rb', line 154 def save(_path = nil) # Already persisted. Provided for parity with the JSON backend. end |
#setup! ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/iriq/storage/sqlite.rb', line 104 def setup! @db.execute_batch(SCHEMA) existing = @db.get_first_value("SELECT value FROM meta WHERE key = 'schema_version'") if existing.nil? @db.execute("INSERT INTO meta (key, value) VALUES ('schema_version', ?)", SCHEMA_VERSION.to_s) @db.execute("INSERT INTO meta (key, value) VALUES ('max_values_per_position', ?)", @max_values_per_position.to_s) else @max_values_per_position = (@db.get_first_value( "SELECT value FROM meta WHERE key = 'max_values_per_position'" ) || @max_values_per_position).to_i end self end |
#transaction ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/iriq/storage/sqlite.rb', line 119 def transaction # While inside an outer batch, observe()-time transactions become # no-ops — the outer batch wraps everything in one txn for speed. return yield(self) if @in_batch @db.transaction yield self @db.commit rescue @db.rollback rescue nil raise end |