Module: Legion::Extensions::MicrosoftTeams::Runners::CacheIngest

Includes:
Helpers::Lex
Included in:
Client
Defined in:
lib/legion/extensions/microsoft_teams/runners/cache_ingest.rb

Instance Method Summary collapse

Instance Method Details

#ingest_cache(since: nil, skip_bots: true, db_path: nil, imprint_active: false) ⇒ Object

Ingest Teams messages from local cache into lex-memory traces. Returns count of new traces stored and the latest compose_time seen.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/legion/extensions/microsoft_teams/runners/cache_ingest.rb', line 12

def ingest_cache(since: nil, skip_bots: true, db_path: nil, imprint_active: false, **)
  return { result: { stored: 0, skipped: 0, latest_time: nil, error: 'lex-memory not loaded' } } unless memory_available?

  opts = {}
  opts[:db_path] = db_path if db_path
  extractor = MicrosoftTeams::LocalCache::Extractor.new(**opts)

  return { result: { stored: 0, skipped: 0, latest_time: nil, error: 'Teams cache not found' } } unless extractor.available?

  since_time = since.is_a?(String) ? Time.parse(since) : since
  messages = extractor.extract(since: since_time, skip_bots: skip_bots)

  stored = 0
  skipped = 0
  latest_time = nil
  thread_groups = Hash.new { |h, k| h[k] = [] }

  messages.each do |msg|
    text = strip_html(msg.content)
    next if text.empty? || text.length < 5

    trace_result = store_message_trace(msg, text, imprint_active: imprint_active)
    if trace_result
      stored += 1
      thread_groups[msg.thread_id] << trace_result[:trace_id] if msg.thread_id
    else
      skipped += 1
    end

    latest_time = msg.compose_time if msg.compose_time && (latest_time.nil? || msg.compose_time > latest_time)
  end

  coactivate_thread_traces(thread_groups)
  flush_trace_store if stored.positive?

  { result: { stored: stored, skipped: skipped, latest_time: latest_time } }
end