Module: Legion::Extensions::MicrosoftTeams::Runners::ApiIngest
- Extended by:
- Definitions, ApiIngest
- Includes:
- Helpers::Lex, Helpers::Client, Helpers::GraphCache, Helpers::HighWaterMark, Helpers::PermissionGuard
- Included in:
- ApiIngest
- Defined in:
- lib/legion/extensions/microsoft_teams/runners/api_ingest.rb
Constant Summary collapse
- MAX_CHAT_PAGES =
10
Constants included from Helpers::HighWaterMark
Helpers::HighWaterMark::HWM_TTL
Constants included from Helpers::PermissionGuard
Helpers::PermissionGuard::BACKOFF_SCHEDULE
Instance Method Summary collapse
-
#ingest_api(token:, top_people: 15, message_depth: 50, skip_bots: true, imprint_active: false) ⇒ Object
Fetch top contacts via /me/people, then pull recent messages from their 1:1 chats.
Methods included from Helpers::GraphCache
#cached_graph_get, #graph_cache_ttl, #graph_user_key, #invalidate_graph_cache
Methods included from Helpers::HighWaterMark
#get_extended_hwm, #get_hwm, #hwm_key, #new_messages, #persist_hwm_as_trace, #restore_hwm_from_traces, #set_extended_hwm, #set_hwm, #update_extended_hwm, #update_hwm_from_messages
Methods included from Helpers::PermissionGuard
#denial_info, #guarded_request, #permission_denied?, #record_denial, #reset_denials!
Methods included from Helpers::Client
#bot_connection, #graph_connection, #oauth_connection, #user_path
Instance Method Details
#ingest_api(token:, top_people: 15, message_depth: 50, skip_bots: true, imprint_active: false) ⇒ Object
Fetch top contacts via /me/people, then pull recent messages from their 1:1 chats. Stores each message as an individual memory trace (same format as CacheIngest) with dedup by content hash.
Requires a delegated token with Chat.Read and People.Read scopes.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/legion/extensions/microsoft_teams/runners/api_ingest.rb', line 30 def ingest_api(token:, top_people: 15, message_depth: 50, skip_bots: true, imprint_active: false, **) # rubocop:disable Metrics/AbcSize,Metrics/MethodLength log.debug("ApiIngest#ingest_api top_people=#{top_people} message_depth=#{}") return error_result('lex-memory not loaded') unless memory_available? return error_result('no token provided') unless token && !token.empty? restore_hwm_from_traces people = fetch_top_people(token: token, top: top_people) log.debug("ApiIngest: fetched #{people.size} top people") return error_result('people endpoint denied or empty') if people.empty? chats = fetch_one_on_one_chats(token: token) log.debug("ApiIngest: fetched #{chats.size} oneOnOne chats") return error_result('no 1:1 chats found') if chats.empty? existing_hashes = load_existing_hashes conn = graph_connection(token: token) chat_index = build_chat_member_index(conn: conn, chats: chats) stored = 0 skipped = 0 people_ingested = 0 thread_groups = Hash.new { |h, k| h[k] = [] } person_texts = Hash.new { |h, k| h[k] = [] } people.each do |person| chat = find_chat_for_person_indexed(person: person, chat_index: chat_index) unless chat log.debug("ApiIngest: no chat match for #{person['displayName']} " \ "(email=#{person.dig('scoredEmailAddresses', 0, 'address')}, id=#{person['id']})") next end log.info("ApiIngest: matched #{person['displayName']} to chat #{chat['id']}") = (conn: conn, chat_id: chat['id'], depth: ) next if .empty? msg_stored = 0 .each do |msg| next if skip_bots && (msg) text = extract_body_text(msg) next if text.length < 5 content_hash = msg['id'] || Digest::SHA256.hexdigest(text)[0, 16] if existing_hashes.include?(content_hash) skipped += 1 next end trace_result = (msg, text, person, chat['id'], content_hash: content_hash, imprint_active: imprint_active) if trace_result stored += 1 msg_stored += 1 existing_hashes << content_hash thread_groups[chat['id']] << trace_result[:trace_id] person_texts[person['displayName']] << text else skipped += 1 end end next unless msg_stored.positive? people_ingested += 1 update_extended_hwm(chat_id: chat['id'], last_message_at: .filter_map { |m| m['createdDateTime'] }.max, new_message_count: msg_stored, ingested: true) end coactivate_thread_traces(thread_groups) flush_trace_store if stored.positive? apollo_results = publish_to_apollo(person_texts) if stored.positive? { result: { stored: stored, skipped: skipped, people_ingested: people_ingested, people_found: people.length, chats_found: chats.length, apollo: apollo_results } } # rubocop:disable Legion/RescueLogging/NoCapture # Re-raise unlogged: surface the typed throttle to the caller (the # ApiIngest actor) so it can defer its next scheduled run by the # advertised retry_after. The throttle is already logged at the # middleware/circuit layer; folding it into an error result here # would hide the one signal the actor needs to stop re-charging # the shared Graph circuit on its fixed interval. rescue Errors::Throttled raise # rubocop:enable Legion/RescueLogging/NoCapture rescue StandardError => e handle_exception(e, level: :error, operation: 'ApiIngest#ingest_api') { result: { stored: stored || 0, skipped: skipped || 0, error: e. } } end |