Class: S3arch::Indexer

Inherits:
Object
  • Object
show all
Defined in:
lib/s3arch/indexer.rb

Overview

Builds SQLite FTS5 databases per owner from pre-computed tokens stored in DynamoDB. The indexer never sees raw content — only tokens. Supports incremental updates via DynamoDB Stream events (INSERT/MODIFY/REMOVE).

Instance Method Summary collapse

Constructor Details

#initialize(config: S3arch.configuration) ⇒ Indexer

Returns a new instance of Indexer.



13
14
15
16
17
18
# File 'lib/s3arch/indexer.rb', line 13

def initialize(config: S3arch.configuration)
  config.validate!
  @config = config
  @dynamodb = Aws::DynamoDB::Client.new
  @s3 = Aws::S3::Client.new
end

Instance Method Details

#apply_changes(owner_id, changes) ⇒ Object

Incremental update — applies INSERT/DELETE/UPDATE to an existing index. Downloads current DB from S3, applies changes, re-uploads.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/s3arch/indexer.rb', line 37

def apply_changes(owner_id, changes)
  db_path = "/tmp/s3arch_#{owner_id}.sqlite3"
  download_existing(owner_id, db_path)

  unless File.exist?(db_path)
    log(:info, 'No existing index, doing full rebuild', owner_id: owner_id)
    return rebuild(owner_id)
  end

  db = SQLite3::Database.new(db_path)
  db.results_as_hash = true

  db.transaction do
    changes.each { |change| apply_change(db, change) }
  end

  record_count = db.get_first_value('SELECT COUNT(*) FROM records_meta')
  db.close

  upload(owner_id, db_path)
  increment_version(owner_id, record_count)

  log(:info, 'Index updated incrementally', owner_id: owner_id, changes: changes.size, record_count: record_count)
ensure
  File.delete(db_path) if db_path && File.exist?(db_path)
end

#process_event(event) ⇒ Object

Process SQS event containing DynamoDB stream records. Groups by owner and applies incremental changes.



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/s3arch/indexer.rb', line 66

def process_event(event)
  sqs_records = event['Records'] || []
  grouped = group_changes(sqs_records)

  log(:info, 'Processing stream events', owner_count: grouped.size, record_count: sqs_records.size)

  grouped.each do |owner_id, changes|
    if changes.any? { |c| c[:action] == :rebuild }
      rebuild(owner_id)
    else
      apply_changes(owner_id, changes)
    end
  end

  { statusCode: 200, body: JSON.generate(rebuilt: grouped.size) }
end

#rebuild(owner_id) ⇒ Object

Full rebuild — pulls all tokens from DynamoDB for an owner. Used for initial backfill or when incremental isn’t possible.



22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/s3arch/indexer.rb', line 22

def rebuild(owner_id)
  records = fetch_records(owner_id)
  db_path = "/tmp/s3arch_#{owner_id}.sqlite3"

  build_database(db_path, records)
  upload(owner_id, db_path)
  increment_version(owner_id, records.size)

  log(:info, 'Index rebuilt', owner_id: owner_id, record_count: records.size)
ensure
  File.delete(db_path) if db_path && File.exist?(db_path)
end