Module: Legion::Extensions::MicrosoftTeams::Helpers::HighWaterMark

Included in:
Actor::DirectChatPoller, Actor::ObservedChatPoller, Runners::ApiIngest, Runners::ProfileIngest
Defined in:
lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb

Constant Summary collapse

HWM_TTL =

24 hours

86_400

Instance Method Summary collapse

Instance Method Details

#get_extended_hwm(chat_id:) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 50

def get_extended_hwm(chat_id:)
  key = "teams:ehwm:#{chat_id}"
  raw = if cache_available?
          cache_get(key)
        else
          @ehwm_fallback ||= {}
          @ehwm_fallback[key]
        end
  return nil unless raw

  raw.is_a?(Hash) ? raw : ::JSON.parse(raw, symbolize_names: true)
rescue StandardError => e
  log.debug("HighWaterMark: get_extended_hwm failed to parse cached value: #{e.message}")
  nil
end

#get_hwm(chat_id:) ⇒ Object



16
17
18
19
20
21
22
23
24
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 16

def get_hwm(chat_id:)
  key = hwm_key(chat_id: chat_id)
  if cache_available?
    cache_get(key)
  else
    @hwm_fallback ||= {}
    @hwm_fallback[key]
  end
end

#hwm_key(chat_id:) ⇒ Object



12
13
14
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 12

def hwm_key(chat_id:)
  "teams:hwm:#{chat_id}"
end

#memory_runnerObject



114
115
116
117
118
119
120
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 114

def memory_runner
  @memory_runner ||= begin
    runner = Object.new
    runner.extend(Legion::Extensions::Agentic::Memory::Trace::Runners::Traces)
    runner
  end
end

#new_messages(chat_id:, messages:) ⇒ Object



36
37
38
39
40
41
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 36

def new_messages(chat_id:, messages:)
  hwm = get_hwm(chat_id: chat_id)
  return messages if hwm.nil?

  messages.select { |m| m[:createdDateTime] > hwm }
end

#persist_hwm_as_trace(chat_id:) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 86

def persist_hwm_as_trace(chat_id:)
  hwm = get_extended_hwm(chat_id: chat_id)
  return unless hwm

  memory_runner.store_trace(
    type:            :procedural,
    content_payload: ::JSON.dump({ chat_id: chat_id }.merge(hwm)),
    domain_tags:     ['teams', 'hwm', "chat:#{chat_id}"],
    confidence:      1.0,
    origin:          :direct_experience
  )
end

#restore_hwm_from_tracesObject



99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 99

def restore_hwm_from_traces
  traces = memory_runner.retrieve_by_domain(domain_tag: 'teams', min_strength: 0.0, limit: 500)
  return unless traces.is_a?(Array)

  traces.select { |t| t[:trace_type] == :procedural && t[:domain_tags]&.include?('hwm') }.each do |trace|
    data = ::JSON.parse(trace[:content_payload], symbolize_names: true)
    next unless data[:chat_id]

    set_extended_hwm(chat_id: data[:chat_id], last_message_at: data[:last_message_at],
                     last_ingested_at: data[:last_ingested_at], message_count: data[:message_count] || 0)
  end
rescue StandardError => e
  log.warn("Failed to restore HWM from traces: #{e.message}")
end

#set_extended_hwm(chat_id:, last_message_at:, last_ingested_at:, message_count: 0) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 66

def set_extended_hwm(chat_id:, last_message_at:, last_ingested_at:, message_count: 0)
  key = "teams:ehwm:#{chat_id}"
  value = { last_message_at: last_message_at, last_ingested_at: last_ingested_at,
            message_count: message_count }
  if cache_available?
    cache_set(key, ::JSON.dump(value), ttl: HWM_TTL)
  else
    @ehwm_fallback ||= {}
    @ehwm_fallback[key] = value
  end
end

#set_hwm(chat_id:, timestamp:) ⇒ Object



26
27
28
29
30
31
32
33
34
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 26

def set_hwm(chat_id:, timestamp:)
  key = hwm_key(chat_id: chat_id)
  if cache_available?
    cache_set(key, timestamp, ttl: HWM_TTL)
  else
    @hwm_fallback ||= {}
    @hwm_fallback[key] = timestamp
  end
end

#update_extended_hwm(chat_id:, last_message_at:, new_message_count: 0, ingested: false) ⇒ Object



78
79
80
81
82
83
84
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 78

def update_extended_hwm(chat_id:, last_message_at:, new_message_count: 0, ingested: false)
  existing = get_extended_hwm(chat_id: chat_id) || { last_message_at: nil, last_ingested_at: nil, message_count: 0 }
  existing[:last_message_at] = last_message_at
  existing[:message_count] = (existing[:message_count] || 0) + new_message_count
  existing[:last_ingested_at] = Time.now.utc.iso8601 if ingested
  set_extended_hwm(chat_id: chat_id, **existing)
end

#update_hwm_from_messages(chat_id:, messages:) ⇒ Object



43
44
45
46
47
48
# File 'lib/legion/extensions/microsoft_teams/helpers/high_water_mark.rb', line 43

def update_hwm_from_messages(chat_id:, messages:)
  return if messages.empty?

  latest = messages.map { |m| m[:createdDateTime] }.max
  set_hwm(chat_id: chat_id, timestamp: latest)
end