Class: MeleteEnrichmentJob
- Inherits:
-
ApplicationJob
- Object
- ActiveJob::Base
- ApplicationJob
- MeleteEnrichmentJob
- Defined in:
- app/jobs/melete_enrichment_job.rb,
app/jobs/melete_enrichment_job/goal_change_listener.rb
Overview
First stage of the drain pipeline: runs Melete to activate skills, read workflows, and refine goals. Hands off to either Mneme (when goals changed during this run) or directly to the drain loop.
Triggered by Events::Subscribers::MeleteKickoff in response to Events::StartMelete. Runs the existing synchronous Melete::Runner — the event is only the entry/exit plumbing.
A GoalChangeListener observes Events::GoalCreated and Events::GoalUpdated for the duration of the runner call. When a goal mutation is heard the job emits Events::StartMneme so Mneme recalls against the fresh goal set; otherwise it emits Events::StartProcessing and Mneme is skipped — there is no new search seed to recall against.
Sub-agents skip Melete entirely (sub-agent nickname assignment is a one-time step, not part of the recurring pipeline). With no runner call, the listener never fires and the job falls through to Events::StartProcessing.
Exceptions from Melete::Runner#call propagate — no defensive rescue. A crashed Melete leaves the session idle with the PM still in the mailbox; the next PM create (or idle-wake re-route) retries the full chain. Swallowing would surface a degraded response to the user without the failure being visible anywhere (anti-pattern per the project’s “soft error paths” principle).
Defined Under Namespace
Classes: GoalChangeListener
Instance Method Summary collapse
Instance Method Details
#perform(session_id, pending_message_id: nil) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'app/jobs/melete_enrichment_job.rb', line 35 def perform(session_id, pending_message_id: nil) session = Session.find(session_id) goal_changed = GoalChangeListener.observe(session_id: session_id) do Melete::Runner.new(session).call unless session.sub_agent? end next_event_class = goal_changed ? Events::StartMneme : Events::StartProcessing Events::Bus.emit(next_event_class.new( session_id: session_id, pending_message_id: )) end |