Class: Engram::UseCases::Observe

Inherits:
Object
  • Object
show all
Defined in:
lib/engram/use_cases/observe.rb

Overview

Orchestrates a single observed turn: extract candidate facts, consolidate them against existing memory, and apply the resulting decisions to the store. Pure and synchronous — async execution is a Rails concern (see ObserveJob).

When a ProcessedTurns store and an idempotency_key are provided, a turn that was already processed is skipped (no extraction, no duplicate memories).

Instance Method Summary collapse

Constructor Details

#initialize(store:, extractor:, consolidator:, processed_turns: nil, embedder: Engram.config.embedder) ⇒ Observe

Returns a new instance of Observe.



12
13
14
15
16
17
18
# File 'lib/engram/use_cases/observe.rb', line 12

def initialize(store:, extractor:, consolidator:, processed_turns: nil, embedder: Engram.config.embedder)
  @store = store
  @extractor = extractor
  @consolidator = consolidator
  @processed_turns = processed_turns
  @embedder = embedder
end

Instance Method Details

#call(messages:, scope:, idempotency_key: nil) ⇒ Object

Returns the Array<Decision> that were applied (empty if skipped or nothing found).



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/engram/use_cases/observe.rb', line 21

def call(messages:, scope:, idempotency_key: nil)
  payload = Engram::Instrumentation.payload(
    scope: scope,
    store: @store,
    message_count: messages.size,
    idempotency_key_present: !idempotency_key.nil?
  )
  Engram::Instrumentation.instrument("observe", payload) do
    if already_processed?(idempotency_key)
      payload[:skipped] = true
      payload[:candidate_count] = 0
      payload[:decision_count] = 0
      next []
    end

    candidates = extract(messages: messages, scope: scope)
    payload[:candidate_count] = candidates.size
    if candidates.empty?
      mark_processed(idempotency_key)
      payload[:decision_count] = 0
      next []
    end

    decisions = consolidate(candidates: candidates, scope: scope)
    applied_decisions = decisions.filter_map { |decision| apply(decision) }
    payload[:decision_count] = applied_decisions.size
    payload[:decision_actions] = applied_decisions.map { |decision| decision.action.to_s }
    mark_processed(idempotency_key)
    applied_decisions
  end
end