Class: S3arch::Indexer
- Inherits:
-
Object
- Object
- S3arch::Indexer
- Defined in:
- lib/s3arch/indexer.rb
Overview
Builds SQLite FTS5 databases per owner from pre-computed tokens stored in DynamoDB. The indexer never sees raw content — only tokens. Supports incremental updates via DynamoDB Stream events (INSERT/MODIFY/REMOVE).
Instance Method Summary collapse
-
#apply_changes(owner_id, changes) ⇒ Object
Incremental update — applies INSERT/DELETE/UPDATE to an existing index.
-
#initialize(config: S3arch.configuration) ⇒ Indexer
constructor
A new instance of Indexer.
-
#process_event(event) ⇒ Object
Process SQS event containing DynamoDB stream records.
-
#rebuild(owner_id) ⇒ Object
Full rebuild — pulls all tokens from DynamoDB for an owner.
Constructor Details
#initialize(config: S3arch.configuration) ⇒ Indexer
Returns a new instance of Indexer.
13 14 15 16 17 18 |
# File 'lib/s3arch/indexer.rb', line 13 def initialize(config: S3arch.configuration) config.validate! @config = config @dynamodb = Aws::DynamoDB::Client.new @s3 = Aws::S3::Client.new end |
Instance Method Details
#apply_changes(owner_id, changes) ⇒ Object
Incremental update — applies INSERT/DELETE/UPDATE to an existing index. Downloads current DB from S3, applies changes, re-uploads.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/s3arch/indexer.rb', line 37 def apply_changes(owner_id, changes) db_path = "/tmp/s3arch_#{owner_id}.sqlite3" download_existing(owner_id, db_path) unless File.exist?(db_path) log(:info, 'No existing index, doing full rebuild', owner_id: owner_id) return rebuild(owner_id) end db = SQLite3::Database.new(db_path) db.results_as_hash = true db.transaction do changes.each { |change| apply_change(db, change) } end record_count = db.get_first_value('SELECT COUNT(*) FROM records_meta') db.close upload(owner_id, db_path) increment_version(owner_id, record_count) log(:info, 'Index updated incrementally', owner_id: owner_id, changes: changes.size, record_count: record_count) ensure File.delete(db_path) if db_path && File.exist?(db_path) end |
#process_event(event) ⇒ Object
Process SQS event containing DynamoDB stream records. Groups by owner and applies incremental changes.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/s3arch/indexer.rb', line 66 def process_event(event) sqs_records = event['Records'] || [] grouped = group_changes(sqs_records) log(:info, 'Processing stream events', owner_count: grouped.size, record_count: sqs_records.size) grouped.each do |owner_id, changes| if changes.any? { |c| c[:action] == :rebuild } rebuild(owner_id) else apply_changes(owner_id, changes) end end { statusCode: 200, body: JSON.generate(rebuilt: grouped.size) } end |
#rebuild(owner_id) ⇒ Object
Full rebuild — pulls all tokens from DynamoDB for an owner. Used for initial backfill or when incremental isn’t possible.
22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/s3arch/indexer.rb', line 22 def rebuild(owner_id) records = fetch_records(owner_id) db_path = "/tmp/s3arch_#{owner_id}.sqlite3" build_database(db_path, records) upload(owner_id, db_path) increment_version(owner_id, records.size) log(:info, 'Index rebuilt', owner_id: owner_id, record_count: records.size) ensure File.delete(db_path) if db_path && File.exist?(db_path) end |