Class: Engram::UseCases::Observe
- Inherits:
-
Object
- Object
- Engram::UseCases::Observe
- Defined in:
- lib/engram/use_cases/observe.rb
Overview
Orchestrates a single observed turn: extract candidate facts, consolidate them against existing memory, and apply the resulting decisions to the store. Pure and synchronous — async execution is a Rails concern (see ObserveJob).
When a ProcessedTurns store and an idempotency_key are provided, a turn that was already processed is skipped (no extraction, no duplicate memories).
Instance Method Summary collapse
-
#call(messages:, scope:, idempotency_key: nil) ⇒ Object
Returns the Array<Decision> that were applied (empty if skipped or nothing found).
-
#initialize(store:, extractor:, consolidator:, processed_turns: nil, embedder: Engram.config.embedder) ⇒ Observe
constructor
A new instance of Observe.
Constructor Details
#initialize(store:, extractor:, consolidator:, processed_turns: nil, embedder: Engram.config.embedder) ⇒ Observe
Returns a new instance of Observe.
12 13 14 15 16 17 18 |
# File 'lib/engram/use_cases/observe.rb', line 12 def initialize(store:, extractor:, consolidator:, processed_turns: nil, embedder: Engram.config.) @store = store @extractor = extractor @consolidator = consolidator @processed_turns = processed_turns @embedder = end |
Instance Method Details
#call(messages:, scope:, idempotency_key: nil) ⇒ Object
Returns the Array<Decision> that were applied (empty if skipped or nothing found).
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/engram/use_cases/observe.rb', line 21 def call(messages:, scope:, idempotency_key: nil) payload = Engram::Instrumentation.payload( scope: scope, store: @store, message_count: .size, idempotency_key_present: !idempotency_key.nil? ) Engram::Instrumentation.instrument("observe", payload) do if already_processed?(idempotency_key) payload[:skipped] = true payload[:candidate_count] = 0 payload[:decision_count] = 0 next [] end candidates = extract(messages: , scope: scope) payload[:candidate_count] = candidates.size if candidates.empty? mark_processed(idempotency_key) payload[:decision_count] = 0 next [] end decisions = consolidate(candidates: candidates, scope: scope) applied_decisions = decisions.filter_map { |decision| apply(decision) } payload[:decision_count] = applied_decisions.size payload[:decision_actions] = applied_decisions.map { |decision| decision.action.to_s } mark_processed(idempotency_key) applied_decisions end end |