Class: Rubino::Run::Recorder
- Inherits:
-
Object
- Object
- Rubino::Run::Recorder
- Defined in:
- lib/rubino/run/recorder.rb
Overview
Bridges Interaction::EventBus to per-run persisted events. Subscribes to the bus, translates internal symbols to API event names via EVENT_MAP, and writes one row per emission through EventStore.
EVENT_MAP is the single source of truth for the internal-to-API event-type translation; anything not in the map is dropped on the floor (callers that need to bypass the bus, e.g. approval.required / clarify.required, must call #emit directly).
Lifecycle:
recorder = Recorder.new(run_id:, session_id:)
recorder.attach!
... run loop ...
recorder.detach!
Constant Summary collapse
- EVENT_MAP =
{ Interaction::Events::MODEL_STREAM => "message.delta", Interaction::Events::MESSAGE_COMPLETED => "message.completed", Interaction::Events::TOOL_STARTED => "tool.started", Interaction::Events::TOOL_PROGRESS => "tool.progress", Interaction::Events::TOOL_FINISHED => "tool.completed", Interaction::Events::ARTIFACT_CREATED => "artifact.created", Interaction::Events::INPUT_INJECTED => "input.injected", Interaction::Events::SKILL_LOADED => "skill.loaded", Interaction::Events::SUBAGENT_SPAWNED => "subagent.spawned", Interaction::Events::SUBAGENT_COMPLETED => "subagent.completed", Interaction::Events::SUBAGENT_FAILED => "subagent.failed", Interaction::Events::INTERACTION_FINISHED => "run.completed", Interaction::Events::INTERACTION_FAILED => "run.failed" }.freeze
Instance Method Summary collapse
- #attach! ⇒ Object
- #detach! ⇒ Object
-
#emit(api_type, payload) ⇒ Object
Direct emission bypassing EventBus (used for API-only events like approval.required).
-
#initialize(run_id:, session_id:, event_bus: nil, store: nil) ⇒ Recorder
constructor
A new instance of Recorder.
Constructor Details
#initialize(run_id:, session_id:, event_bus: nil, store: nil) ⇒ Recorder
Returns a new instance of Recorder.
36 37 38 39 40 41 42 |
# File 'lib/rubino/run/recorder.rb', line 36 def initialize(run_id:, session_id:, event_bus: nil, store: nil) @run_id = run_id @session_id = session_id @event_bus = event_bus || Rubino.event_bus @store = store || EventStore.new @subscribers = [] end |
Instance Method Details
#attach! ⇒ Object
44 45 46 47 48 49 50 |
# File 'lib/rubino/run/recorder.rb', line 44 def attach! EVENT_MAP.each do |internal_type, api_type| handler = ->(payload) { record(api_type, payload) } @event_bus.on(internal_type, &handler) @subscribers << [internal_type, handler] end end |
#detach! ⇒ Object
52 53 54 55 |
# File 'lib/rubino/run/recorder.rb', line 52 def detach! @subscribers.each { |type, _| @event_bus.off(type) } @subscribers.clear end |
#emit(api_type, payload) ⇒ Object
Direct emission bypassing EventBus (used for API-only events like approval.required).
58 59 60 |
# File 'lib/rubino/run/recorder.rb', line 58 def emit(api_type, payload) record(api_type, payload) end |