Module: Legion::Extensions::MicrosoftTeams::Helpers::HighWaterMark
- Included in:
- Actor::DirectChatPoller, Actor::ObservedChatPoller, Runners::ApiIngest, Runners::ProfileIngest
- Defined in:
- lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb
Constant Summary collapse
- HWM_TTL =
24 hours
86_400
Instance Method Summary collapse
- #get_extended_hwm(chat_id:) ⇒ Object
- #get_hwm(chat_id:) ⇒ Object
- #hwm_key(chat_id:) ⇒ Object
- #memory_runner ⇒ Object
- #new_messages(chat_id:, messages:) ⇒ Object
- #persist_hwm_as_trace(chat_id:) ⇒ Object
- #restore_hwm_from_traces ⇒ Object
- #set_extended_hwm(chat_id:, last_message_at:, last_ingested_at:, message_count: 0) ⇒ Object
- #set_hwm(chat_id:, timestamp:) ⇒ Object
- #update_extended_hwm(chat_id:, last_message_at:, new_message_count: 0, ingested: false) ⇒ Object
- #update_hwm_from_messages(chat_id:, messages:) ⇒ Object
Instance Method Details
#get_extended_hwm(chat_id:) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 50 def get_extended_hwm(chat_id:) key = "teams:ehwm:#{chat_id}" raw = if cache_available? cache_get(key) else @ehwm_fallback ||= {} @ehwm_fallback[key] end return nil unless raw raw.is_a?(Hash) ? raw : ::JSON.parse(raw, symbolize_names: true) rescue StandardError => e log.debug("HighWaterMark: get_extended_hwm failed to parse cached value: #{e.}") nil end |
#get_hwm(chat_id:) ⇒ Object
16 17 18 19 20 21 22 23 24 |
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 16 def get_hwm(chat_id:) key = hwm_key(chat_id: chat_id) if cache_available? cache_get(key) else @hwm_fallback ||= {} @hwm_fallback[key] end end |
#hwm_key(chat_id:) ⇒ Object
12 13 14 |
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 12 def hwm_key(chat_id:) "teams:hwm:#{chat_id}" end |
#memory_runner ⇒ Object
114 115 116 117 118 119 120 |
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 114 def memory_runner @memory_runner ||= begin runner = Object.new runner.extend(Legion::Extensions::Agentic::Memory::Trace::Runners::Traces) runner end end |
#new_messages(chat_id:, messages:) ⇒ Object
36 37 38 39 40 41 |
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 36 def (chat_id:, messages:) hwm = get_hwm(chat_id: chat_id) return if hwm.nil? .select { |m| m[:createdDateTime] > hwm } end |
#persist_hwm_as_trace(chat_id:) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 86 def persist_hwm_as_trace(chat_id:) hwm = get_extended_hwm(chat_id: chat_id) return unless hwm memory_runner.store_trace( type: :procedural, content_payload: ::JSON.dump({ chat_id: chat_id }.merge(hwm)), domain_tags: ['teams', 'hwm', "chat:#{chat_id}"], confidence: 1.0, origin: :direct_experience ) end |
#restore_hwm_from_traces ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 99 def restore_hwm_from_traces traces = memory_runner.retrieve_by_domain(domain_tag: 'teams', min_strength: 0.0, limit: 500) return unless traces.is_a?(Array) traces.select { |t| t[:trace_type] == :procedural && t[:domain_tags]&.include?('hwm') }.each do |trace| data = ::JSON.parse(trace[:content_payload], symbolize_names: true) next unless data[:chat_id] set_extended_hwm(chat_id: data[:chat_id], last_message_at: data[:last_message_at], last_ingested_at: data[:last_ingested_at], message_count: data[:message_count] || 0) end rescue StandardError => e log.warn("Failed to restore HWM from traces: #{e.}") end |
#set_extended_hwm(chat_id:, last_message_at:, last_ingested_at:, message_count: 0) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 66 def set_extended_hwm(chat_id:, last_message_at:, last_ingested_at:, message_count: 0) key = "teams:ehwm:#{chat_id}" value = { last_message_at: , last_ingested_at: last_ingested_at, message_count: } if cache_available? cache_set(key, ::JSON.dump(value), ttl: HWM_TTL) else @ehwm_fallback ||= {} @ehwm_fallback[key] = value end end |
#set_hwm(chat_id:, timestamp:) ⇒ Object
26 27 28 29 30 31 32 33 34 |
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 26 def set_hwm(chat_id:, timestamp:) key = hwm_key(chat_id: chat_id) if cache_available? cache_set(key, , ttl: HWM_TTL) else @hwm_fallback ||= {} @hwm_fallback[key] = end end |
#update_extended_hwm(chat_id:, last_message_at:, new_message_count: 0, ingested: false) ⇒ Object
78 79 80 81 82 83 84 |
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 78 def update_extended_hwm(chat_id:, last_message_at:, new_message_count: 0, ingested: false) existing = get_extended_hwm(chat_id: chat_id) || { last_message_at: nil, last_ingested_at: nil, message_count: 0 } existing[:last_message_at] = existing[:message_count] = (existing[:message_count] || 0) + existing[:last_ingested_at] = Time.now.utc.iso8601 if ingested set_extended_hwm(chat_id: chat_id, **existing) end |
#update_hwm_from_messages(chat_id:, messages:) ⇒ Object
43 44 45 46 47 48 |
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 43 def (chat_id:, messages:) return if .empty? latest = .map { |m| m[:createdDateTime] }.max set_hwm(chat_id: chat_id, timestamp: latest) end |