Module: Legion::Extensions::MicrosoftTeams::Runners::ApiIngest

Extended by:
ApiIngest
Includes:
Helpers::Lex, Helpers::Client, Helpers::HighWaterMark, Helpers::PermissionGuard
Included in:
ApiIngest
Defined in:
lib/legion/extensions/microsoft_teams/runners/api_ingest.rb

Constant Summary collapse

MAX_CHAT_PAGES =
10

Constants included from Helpers::HighWaterMark

Helpers::HighWaterMark::HWM_TTL

Constants included from Helpers::PermissionGuard

Helpers::PermissionGuard::BACKOFF_SCHEDULE

Instance Method Summary collapse

Methods included from Helpers::HighWaterMark

#get_extended_hwm, #get_hwm, #hwm_key, #new_messages, #persist_hwm_as_trace, #restore_hwm_from_traces, #set_extended_hwm, #set_hwm, #update_extended_hwm, #update_hwm_from_messages

Methods included from Helpers::PermissionGuard

#denial_info, #guarded_request, #permission_denied?, #record_denial, #reset_denials!

Methods included from Helpers::Client

#bot_connection, #graph_connection, #oauth_connection, #user_path

Instance Method Details

#ingest_api(token:, top_people: 15, message_depth: 50, skip_bots: true, imprint_active: false) ⇒ Object

Fetch top contacts via /me/people, then pull recent messages from their 1:1 chats. Stores each message as an individual memory trace (same format as CacheIngest) with dedup by content hash.

Requires a delegated token with Chat.Read and People.Read scopes.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/legion/extensions/microsoft_teams/runners/api_ingest.rb', line 24

def ingest_api(token:, top_people: 15, message_depth: 50, skip_bots: true, imprint_active: false, **) # rubocop:disable Metrics/AbcSize,Metrics/CyclomaticComplexity,Metrics/MethodLength,Metrics/PerceivedComplexity
  return error_result('lex-memory not loaded') unless memory_available?
  return error_result('no token provided') unless token && !token.empty?

  restore_hwm_from_traces

  people = fetch_top_people(token: token, top: top_people)
  log.debug("ApiIngest: fetched #{people.size} top people")
  return error_result('people endpoint denied or empty') if people.empty?

  chats = fetch_one_on_one_chats(token: token)
  log.debug("ApiIngest: fetched #{chats.size} oneOnOne chats")
  return error_result('no 1:1 chats found') if chats.empty?

  existing_hashes = load_existing_hashes
  conn = graph_connection(token: token)
  stored = 0
  skipped = 0
  people_ingested = 0
  thread_groups = Hash.new { |h, k| h[k] = [] }
  person_texts = Hash.new { |h, k| h[k] = [] }

  people.each do |person|
    chat = match_chat_to_person(chats: chats, person: person, conn: conn)
    unless chat
      log.debug("ApiIngest: no chat match for #{person['displayName']} " \
                "(email=#{person.dig('scoredEmailAddresses', 0, 'address')}, id=#{person['id']})")
      next
    end
    log.info("ApiIngest: matched #{person['displayName']} to chat #{chat['id']}")

    messages = fetch_chat_messages(conn: conn, chat_id: chat['id'], depth: message_depth)
    next if messages.empty?

    msg_stored = 0
    messages.each do |msg|
      next if skip_bots && bot_message_graph?(msg)

      text = extract_body_text(msg)
      next if text.length < 5

      content_hash = msg['id'] || Digest::SHA256.hexdigest(text)[0, 16]
      if existing_hashes.include?(content_hash)
        skipped += 1
        next
      end

      trace_result = store_graph_message(msg, text, person, chat['id'],
                                         content_hash:   content_hash,
                                         imprint_active: imprint_active)
      if trace_result
        stored += 1
        msg_stored += 1
        existing_hashes << content_hash
        thread_groups[chat['id']] << trace_result[:trace_id]
        person_texts[person['displayName']] << text
      else
        skipped += 1
      end
    end

    next unless msg_stored.positive?

    people_ingested += 1
    update_extended_hwm(chat_id: chat['id'],
                        last_message_at: messages.filter_map { |m| m['createdDateTime'] }.max,
                        new_message_count: msg_stored, ingested: true)
  end

  coactivate_thread_traces(thread_groups)
  flush_trace_store if stored.positive?
  apollo_results = publish_to_apollo(person_texts) if stored.positive?

  { result: { stored: stored, skipped: skipped, people_ingested: people_ingested,
              people_found: people.length, chats_found: chats.length,
              apollo: apollo_results } }
rescue StandardError => e
  log_msg = "ApiIngest failed: #{e.class}#{e.message}"
  log.error(log_msg)
  { result: { stored: stored || 0, skipped: skipped || 0, error: e.message } }
end