Module: Legion::Extensions::MicrosoftTeams::Runners::ApiIngest
- Extended by:
- ApiIngest
- Includes:
- Helpers::Lex, Helpers::Client, Helpers::HighWaterMark, Helpers::PermissionGuard
- Included in:
- ApiIngest
- Defined in:
- lib/legion/extensions/microsoft_teams/runners/api_ingest.rb
Constant Summary collapse
- MAX_CHAT_PAGES =
10
Constants included from Helpers::HighWaterMark
Helpers::HighWaterMark::HWM_TTL
Constants included from Helpers::PermissionGuard
Helpers::PermissionGuard::BACKOFF_SCHEDULE
Instance Method Summary collapse
-
#ingest_api(token:, top_people: 15, message_depth: 50, skip_bots: true, imprint_active: false) ⇒ Object
Fetch top contacts via /me/people, then pull recent messages from their 1:1 chats.
Methods included from Helpers::HighWaterMark
#get_extended_hwm, #get_hwm, #hwm_key, #new_messages, #persist_hwm_as_trace, #restore_hwm_from_traces, #set_extended_hwm, #set_hwm, #update_extended_hwm, #update_hwm_from_messages
Methods included from Helpers::PermissionGuard
#denial_info, #guarded_request, #permission_denied?, #record_denial, #reset_denials!
Methods included from Helpers::Client
#bot_connection, #graph_connection, #oauth_connection, #user_path
Instance Method Details
#ingest_api(token:, top_people: 15, message_depth: 50, skip_bots: true, imprint_active: false) ⇒ Object
Fetch top contacts via /me/people, then pull recent messages from their 1:1 chats. Stores each message as an individual memory trace (same format as CacheIngest) with dedup by content hash.
Requires a delegated token with Chat.Read and People.Read scopes.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/legion/extensions/microsoft_teams/runners/api_ingest.rb', line 24 def ingest_api(token:, top_people: 15, message_depth: 50, skip_bots: true, imprint_active: false, **) # rubocop:disable Metrics/AbcSize,Metrics/CyclomaticComplexity,Metrics/MethodLength,Metrics/PerceivedComplexity return error_result('lex-memory not loaded') unless memory_available? return error_result('no token provided') unless token && !token.empty? restore_hwm_from_traces people = fetch_top_people(token: token, top: top_people) log.debug("ApiIngest: fetched #{people.size} top people") return error_result('people endpoint denied or empty') if people.empty? chats = fetch_one_on_one_chats(token: token) log.debug("ApiIngest: fetched #{chats.size} oneOnOne chats") return error_result('no 1:1 chats found') if chats.empty? existing_hashes = load_existing_hashes conn = graph_connection(token: token) stored = 0 skipped = 0 people_ingested = 0 thread_groups = Hash.new { |h, k| h[k] = [] } person_texts = Hash.new { |h, k| h[k] = [] } people.each do |person| chat = match_chat_to_person(chats: chats, person: person, conn: conn) unless chat log.debug("ApiIngest: no chat match for #{person['displayName']} " \ "(email=#{person.dig('scoredEmailAddresses', 0, 'address')}, id=#{person['id']})") next end log.info("ApiIngest: matched #{person['displayName']} to chat #{chat['id']}") = (conn: conn, chat_id: chat['id'], depth: ) next if .empty? msg_stored = 0 .each do |msg| next if skip_bots && (msg) text = extract_body_text(msg) next if text.length < 5 content_hash = msg['id'] || Digest::SHA256.hexdigest(text)[0, 16] if existing_hashes.include?(content_hash) skipped += 1 next end trace_result = (msg, text, person, chat['id'], content_hash: content_hash, imprint_active: imprint_active) if trace_result stored += 1 msg_stored += 1 existing_hashes << content_hash thread_groups[chat['id']] << trace_result[:trace_id] person_texts[person['displayName']] << text else skipped += 1 end end next unless msg_stored.positive? people_ingested += 1 update_extended_hwm(chat_id: chat['id'], last_message_at: .filter_map { |m| m['createdDateTime'] }.max, new_message_count: msg_stored, ingested: true) end coactivate_thread_traces(thread_groups) flush_trace_store if stored.positive? apollo_results = publish_to_apollo(person_texts) if stored.positive? { result: { stored: stored, skipped: skipped, people_ingested: people_ingested, people_found: people.length, chats_found: chats.length, apollo: apollo_results } } rescue StandardError => e log_msg = "ApiIngest failed: #{e.class} — #{e.}" log.error(log_msg) { result: { stored: stored || 0, skipped: skipped || 0, error: e. } } end |