Class: S3arch::Indexer

Inherits:
Object
  • Object
show all
Defined in:
lib/s3arch/indexer.rb

Overview

Builds SQLite FTS5 databases per owner from pre-computed tokens stored in DynamoDB. The indexer never sees raw content — only tokens. Supports incremental updates via DynamoDB Stream events (INSERT/MODIFY/REMOVE).

Constant Summary collapse

RESERVED_WORDS =
Set.new(%w[status name comment count size type]).freeze

Instance Method Summary collapse

Constructor Details

#initialize(config: S3arch.configuration) ⇒ Indexer

Returns a new instance of Indexer.



12
13
14
15
16
17
# File 'lib/s3arch/indexer.rb', line 12

def initialize(config: S3arch.configuration)
  config.validate!
  @config = config
  @dynamodb = Aws::DynamoDB::Client.new
  @s3 = Aws::S3::Client.new
end

Instance Method Details

#apply_changes(owner_id, changes) ⇒ Object

Incremental update — applies INSERT/DELETE/UPDATE to an existing index. Downloads current DB from S3, applies changes, re-uploads.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/s3arch/indexer.rb', line 36

def apply_changes(owner_id, changes)
  db_path = "/tmp/s3arch_#{owner_id}.sqlite3"
  download_existing(owner_id, db_path)

  unless File.exist?(db_path)
    log(:info, 'No existing index, doing full rebuild', owner_id: owner_id)
    return rebuild(owner_id)
  end

  db = SQLite3::Database.new(db_path)
  db.results_as_hash = true

  db.transaction do
    changes.each { |change| apply_change(db, change) }
  end

  record_count = db.get_first_value('SELECT COUNT(*) FROM records_meta')
  db.close

  upload(owner_id, db_path)
  increment_version(owner_id, record_count)

  log(:info, 'Index updated incrementally', owner_id: owner_id, changes: changes.size, record_count: record_count)
ensure
  File.delete(db_path) if db_path && File.exist?(db_path)
end

#process_event(event) ⇒ Object

Process SQS event containing DynamoDB stream records. Groups by owner and applies incremental changes.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/s3arch/indexer.rb', line 65

def process_event(event)
  sqs_records = event['Records'] || []
  grouped = group_changes(sqs_records)

  log(:info, 'Processing stream events', owner_count: grouped.size, record_count: sqs_records.size)

  grouped.each do |owner_id, changes|
    if changes.any? { |c| c[:action] == :rebuild }
      rebuild(owner_id)
    else
      apply_changes(owner_id, changes)
    end
  end

  { statusCode: 200, body: JSON.generate(rebuilt: grouped.size) }
end

#rebuild(owner_id) ⇒ Object

Full rebuild — pulls all tokens from DynamoDB for an owner. Used for initial backfill or when incremental isn’t possible.



21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/s3arch/indexer.rb', line 21

def rebuild(owner_id)
  records = fetch_records(owner_id)
  db_path = "/tmp/s3arch_#{owner_id}.sqlite3"

  build_database(db_path, records)
  upload(owner_id, db_path)
  increment_version(owner_id, records.size)

  log(:info, 'Index rebuilt', owner_id: owner_id, record_count: records.size)
ensure
  File.delete(db_path) if db_path && File.exist?(db_path)
end