Class: S3arch::Indexer
- Inherits:
-
Object
- Object
- S3arch::Indexer
- Includes:
- StreamParser
- Defined in:
- lib/s3arch/indexer.rb,
lib/s3arch/indexer/stream_parser.rb
Overview
Builds SQLite FTS5 databases per owner from pre-computed tokens stored in DynamoDB. The indexer never sees raw content — only tokens. Supports incremental updates via DynamoDB Stream events (INSERT/MODIFY/REMOVE).
Defined Under Namespace
Modules: StreamParser
Instance Method Summary collapse
-
#apply_changes(owner_id, changes) ⇒ Object
Incremental update — applies INSERT/DELETE/UPDATE to an existing index.
-
#initialize(config: S3arch.configuration, store: nil) ⇒ Indexer
constructor
A new instance of Indexer.
-
#process_event(event) ⇒ Object
Process SQS event containing DynamoDB stream records.
-
#rebuild(owner_id) ⇒ Object
Full rebuild — pulls all tokens from DynamoDB for an owner.
Constructor Details
#initialize(config: S3arch.configuration, store: nil) ⇒ Indexer
Returns a new instance of Indexer.
14 15 16 17 18 |
# File 'lib/s3arch/indexer.rb', line 14 def initialize(config: S3arch.configuration, store: nil) config.validate! @config = config @store = store || Store.new(config: config) end |
Instance Method Details
#apply_changes(owner_id, changes) ⇒ Object
Incremental update — applies INSERT/DELETE/UPDATE to an existing index.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/s3arch/indexer.rb', line 35 def apply_changes(owner_id, changes) db_path = "/tmp/s3arch_#{owner_id}.sqlite3" unless @store.download_index(owner_id, db_path) log(:info, 'No existing index, doing full rebuild', owner_id: owner_id) return rebuild(owner_id) end db = SQLite3::Database.new(db_path) db.results_as_hash = true db.transaction do changes.each { |change| apply_change(db, change) } end record_count = db.get_first_value('SELECT COUNT(*) FROM records_meta') db.close @store.upload_index(owner_id, db_path) @store.increment_version(owner_id, record_count) log(:info, 'Index updated incrementally', owner_id: owner_id, changes: changes.size, record_count: record_count) ensure File.delete(db_path) if db_path && File.exist?(db_path) end |
#process_event(event) ⇒ Object
Process SQS event containing DynamoDB stream records.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/s3arch/indexer.rb', line 62 def process_event(event) sqs_records = event['Records'] || [] grouped = group_changes(sqs_records) log(:info, 'Processing stream events', owner_count: grouped.size, record_count: sqs_records.size) grouped.each do |owner_id, changes| if changes.any? { |c| c[:action] == :rebuild } rebuild(owner_id) else apply_changes(owner_id, changes) end end { statusCode: 200, body: JSON.generate(rebuilt: grouped.size) } end |
#rebuild(owner_id) ⇒ Object
Full rebuild — pulls all tokens from DynamoDB for an owner.
21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/s3arch/indexer.rb', line 21 def rebuild(owner_id) records = @store.fetch_records(owner_id) db_path = "/tmp/s3arch_#{owner_id}.sqlite3" build_database(db_path, records) @store.upload_index(owner_id, db_path) @store.increment_version(owner_id, records.size) log(:info, 'Index rebuilt', owner_id: owner_id, record_count: records.size) ensure File.delete(db_path) if db_path && File.exist?(db_path) end |