Class: Engram::UseCases::Observe

Inherits:
Object
  • Object
show all
Defined in:
lib/engram/use_cases/observe.rb

Overview

Orchestrates a single observed turn: extract candidate facts, consolidate them against existing memory, and apply the resulting decisions to the store. Pure and synchronous — async execution is a Rails concern (see ObserveJob).

When a ProcessedTurns store and an idempotency_key are provided, a turn that was already processed is skipped (no extraction, no duplicate memories).

Instance Method Summary collapse

Constructor Details

#initialize(store:, extractor:, consolidator:, processed_turns: nil) ⇒ Observe

Returns a new instance of Observe.



12
13
14
15
16
17
# File 'lib/engram/use_cases/observe.rb', line 12

def initialize(store:, extractor:, consolidator:, processed_turns: nil)
  @store = store
  @extractor = extractor
  @consolidator = consolidator
  @processed_turns = processed_turns
end

Instance Method Details

#call(messages:, scope:, idempotency_key: nil) ⇒ Object

Returns the Array<Decision> that were applied (empty if skipped or nothing found).



20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/engram/use_cases/observe.rb', line 20

def call(messages:, scope:, idempotency_key: nil)
  return [] if already_processed?(idempotency_key)

  candidates = @extractor.extract(messages: messages, scope: scope)
  if candidates.empty?
    mark_processed(idempotency_key)
    return []
  end

  decisions = @consolidator.reconcile_all(candidates: candidates, scope: scope)
  decisions.each { |decision| apply(decision) }
  mark_processed(idempotency_key)
  decisions
end