Class: S3arch::Indexer

Inherits:
Object
  • Object
show all
Includes:
StreamParser
Defined in:
lib/s3arch/indexer.rb,
lib/s3arch/indexer/stream_parser.rb

Overview

Builds SQLite FTS5 databases per owner from pre-computed tokens stored in DynamoDB. The indexer never sees raw content — only tokens. Supports incremental updates via DynamoDB Stream events (INSERT/MODIFY/REMOVE).

Defined Under Namespace

Modules: StreamParser

Instance Method Summary collapse

Constructor Details

#initialize(config: S3arch.configuration, store: nil) ⇒ Indexer

Returns a new instance of Indexer.



14
15
16
17
18
# File 'lib/s3arch/indexer.rb', line 14

def initialize(config: S3arch.configuration, store: nil)
  config.validate!
  @config = config
  @store = store || Store.new(config: config)
end

Instance Method Details

#apply_changes(owner_id, changes) ⇒ Object

Incremental update — applies INSERT/DELETE/UPDATE to an existing index.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/s3arch/indexer.rb', line 35

def apply_changes(owner_id, changes)
  db_path = "/tmp/s3arch_#{owner_id}.sqlite3"

  unless @store.download_index(owner_id, db_path)
    log(:info, 'No existing index, doing full rebuild', owner_id: owner_id)
    return rebuild(owner_id)
  end

  db = SQLite3::Database.new(db_path)
  db.results_as_hash = true

  db.transaction do
    changes.each { |change| apply_change(db, change) }
  end

  record_count = db.get_first_value('SELECT COUNT(*) FROM records_meta')
  db.close

  @store.upload_index(owner_id, db_path)
  @store.increment_version(owner_id, record_count)

  log(:info, 'Index updated incrementally', owner_id: owner_id, changes: changes.size, record_count: record_count)
ensure
  File.delete(db_path) if db_path && File.exist?(db_path)
end

#process_event(event) ⇒ Object

Process SQS event containing DynamoDB stream records.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/s3arch/indexer.rb', line 62

def process_event(event)
  sqs_records = event['Records'] || []
  grouped = group_changes(sqs_records)

  log(:info, 'Processing stream events', owner_count: grouped.size, record_count: sqs_records.size)

  grouped.each do |owner_id, changes|
    if changes.any? { |c| c[:action] == :rebuild }
      rebuild(owner_id)
    else
      apply_changes(owner_id, changes)
    end
  end

  { statusCode: 200, body: JSON.generate(rebuilt: grouped.size) }
end

#rebuild(owner_id) ⇒ Object

Full rebuild — pulls all tokens from DynamoDB for an owner.



21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/s3arch/indexer.rb', line 21

def rebuild(owner_id)
  records = @store.fetch_records(owner_id)
  db_path = "/tmp/s3arch_#{owner_id}.sqlite3"

  build_database(db_path, records)
  @store.upload_index(owner_id, db_path)
  @store.increment_version(owner_id, records.size)

  log(:info, 'Index rebuilt', owner_id: owner_id, record_count: records.size)
ensure
  File.delete(db_path) if db_path && File.exist?(db_path)
end