Class: Opencode::Reply
- Inherits:
-
Object
- Object
- Opencode::Reply
- Defined in:
- lib/opencode/reply.rb
Overview
An assistant’s reply as it is being composed, live, from OpenCode SSE events. A Reply accumulates parts (text, reasoning, tool invocations) in the order the agent emits them and notifies observers of domain transitions — parts appearing, parts growing, tools advancing, sessions erroring.
Responsibilities
-
Translate raw OpenCode SSE events into domain callbacks.
-
Own the canonical state of an in-flight reply (parts list, indices, first-token seen, message info).
-
Apply the tail-drop safety net: when part.updated carries authoritative :text that differs from what deltas accumulated (z.ai GLM-5.1 drops trailing deltas), rewrite the part’s content.
-
Preserve the original tool name when OpenCode later renames a tool to “invalid” mid-stream.
Not responsibilities
-
Rendering HTML or broadcasting Turbo Streams (observer concern).
-
Persisting parts to a database (observer concern).
-
Fetching the event stream (Opencode::Client).
-
Retry / session recovery (job concern).
Event contract
Events match OpenCode’s bus schema (packages/opencode/src/session/ message-v2.ts, status.ts, todo.ts):
message.part.delta { properties: { partID, field, delta, ... } }
message.part.updated { properties: { part: { id, type, ... } } }
message.updated { properties: { info: { tokens, cost, ... } } }
session.status { properties: { status: { type, ... } } }
session.error { properties: { error: { name, data, ... } } }
todo.updated { properties: { todos: [...] } }
Observer callbacks
See Opencode::ReplyObserver for the full callback surface. Observers are duck-typed — only the callbacks they define are invoked.
Example
reply = Opencode::Reply.new
reply.add_observer(MyApp::ReplyStream.new(message:)) # your observer
client.stream_events(session_id: id) { |event| reply.apply(event) }
reply.result
# => Opencode::Reply::Result with parts_json, full_text, reasoning_text, tool_parts
Defined Under Namespace
Classes: Result
Constant Summary collapse
- STREAMABLE_TYPES =
%w[text reasoning tool].freeze
- TERMINAL_TOOL_STATUSES =
%w[completed error].freeze
- TODO_TOOLS =
%w[todowrite todoread].freeze
Instance Attribute Summary collapse
-
#info ⇒ Object
readonly
Returns the value of attribute info.
-
#parts ⇒ Object
readonly
Returns the value of attribute parts.
-
#prompts ⇒ Object
readonly
Returns the value of attribute prompts.
-
#total_cost ⇒ Object
readonly
Returns the value of attribute total_cost.
-
#total_input_tokens ⇒ Object
readonly
Returns the value of attribute total_input_tokens.
-
#total_output_tokens ⇒ Object
readonly
Returns the value of attribute total_output_tokens.
Class Method Summary collapse
-
.distill(parts) ⇒ Object
Pure function: given a parts array, return the denormalized result as an Opencode::Reply::Result value object.
Instance Method Summary collapse
- #add_observer(observer) ⇒ Object
-
#apply(event) ⇒ Object
Drive the state machine forward with one SSE event.
- #first_text_seen? ⇒ Boolean
-
#initialize ⇒ Reply
constructor
A new instance of Reply.
-
#inject_part(part_hash) ⇒ Object
Record a part that originated OUTSIDE the OpenCode event stream — used when an observer synthesizes a part (e.g., a session error notice) that isn’t a real message.part.* event but should still appear in the persisted parts_json.
-
#prompt_blocked? ⇒ Boolean
True while any interactive prompt (question or permission) is awaiting a user reply.
-
#replace_parts(recovered_parts) ⇒ Object
Treat ‘recovered_parts` as a clean-slate baseline: replace parts, clear the id→index map (recovered parts have no OpenCode part IDs), and reset the running cost/token totals plus the first-text flag.
-
#result ⇒ Object
The denormalized result once streaming completes, matching the shape jobs persist to the message table: full_text for :content, reasoning_text for :reasoning, tool_parts for :tool_calls_json, and parts_json for :parts_json.
-
#sync_recovered_parts(recovered_parts) ⇒ Object
Bring the live reply up to a recovered/polled exchange snapshot and notify observers for new or changed parts.
- #tool_count ⇒ Object
Constructor Details
#initialize ⇒ Reply
Returns a new instance of Reply.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/opencode/reply.rb', line 73 def initialize @parts = [] @part_index_by_id = {} @part_type_by_id = {} @observers = [] @first_text_seen = false @info = nil @total_cost = 0.0 @total_input_tokens = 0 @total_output_tokens = 0 @todo_part_index = nil @prompts = Opencode::Prompts.new # Keyed by [message_id, call_id]: question.asked payloads that # arrived before their matching tool part. Drained when the tool # part shows up in apply_tool_state. @pending_question_payloads = {} end |
Instance Attribute Details
#info ⇒ Object (readonly)
Returns the value of attribute info.
71 72 73 |
# File 'lib/opencode/reply.rb', line 71 def info @info end |
#parts ⇒ Object (readonly)
Returns the value of attribute parts.
71 72 73 |
# File 'lib/opencode/reply.rb', line 71 def parts @parts end |
#prompts ⇒ Object (readonly)
Returns the value of attribute prompts.
71 72 73 |
# File 'lib/opencode/reply.rb', line 71 def prompts @prompts end |
#total_cost ⇒ Object (readonly)
Returns the value of attribute total_cost.
71 72 73 |
# File 'lib/opencode/reply.rb', line 71 def total_cost @total_cost end |
#total_input_tokens ⇒ Object (readonly)
Returns the value of attribute total_input_tokens.
71 72 73 |
# File 'lib/opencode/reply.rb', line 71 def total_input_tokens @total_input_tokens end |
#total_output_tokens ⇒ Object (readonly)
Returns the value of attribute total_output_tokens.
71 72 73 |
# File 'lib/opencode/reply.rb', line 71 def total_output_tokens @total_output_tokens end |
Class Method Details
.distill(parts) ⇒ Object
Pure function: given a parts array, return the denormalized result as an Opencode::Reply::Result value object. Exposed so a recovery path (fetch messages from the session API and map them through ResponseParser.extract_interleaved_parts) produces the same shape as live streaming.
202 203 204 205 206 207 208 209 |
# File 'lib/opencode/reply.rb', line 202 def self.distill(parts) Result.new( parts_json: parts, full_text: join_content(parts, "text"), reasoning_text: join_content(parts, "reasoning"), tool_parts: parts.select { |p| p["type"] == "tool" && TERMINAL_TOOL_STATUSES.include?(p["status"]) } ) end |
Instance Method Details
#add_observer(observer) ⇒ Object
99 100 101 102 |
# File 'lib/opencode/reply.rb', line 99 def add_observer(observer) @observers << observer self end |
#apply(event) ⇒ Object
Drive the state machine forward with one SSE event. Unknown event types are ignored — OpenCode may add new events, and we shouldn’t crash on them.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/opencode/reply.rb', line 107 def apply(event) case event[:type] when "message.part.delta" then apply_part_delta(event) when "message.part.updated" then apply_part_updated(event) when "message.updated" then (event) when "session.status" then apply_session_status(event) when "session.error" then apply_session_error(event) when "todo.updated" then apply_todo_updated(event) when "question.asked" then apply_question_asked(event) when "question.replied" then apply_question_replied(event) when "question.rejected" then apply_question_rejected(event) when "permission.asked" then (event) when "permission.replied" then (event) end end |
#first_text_seen? ⇒ Boolean
181 182 183 |
# File 'lib/opencode/reply.rb', line 181 def first_text_seen? @first_text_seen end |
#inject_part(part_hash) ⇒ Object
Record a part that originated OUTSIDE the OpenCode event stream —used when an observer synthesizes a part (e.g., a session error notice) that isn’t a real message.part.* event but should still appear in the persisted parts_json. Returns the new index.
Does NOT fire part_added — the injecting observer has already done whatever rendering it needed. Other observers can poll ‘parts` if they care about injected content.
176 177 178 179 |
# File 'lib/opencode/reply.rb', line 176 def inject_part(part_hash) @parts << part_hash @parts.size - 1 end |
#prompt_blocked? ⇒ Boolean
True while any interactive prompt (question or permission) is awaiting a user reply. Opencode::Client uses this to suspend the SSE inactivity deadline — a wait on the human is healthy, not a hang.
95 96 97 |
# File 'lib/opencode/reply.rb', line 95 def prompt_blocked? @prompts.prompt_blocked? end |
#replace_parts(recovered_parts) ⇒ Object
Treat ‘recovered_parts` as a clean-slate baseline: replace parts, clear the id→index map (recovered parts have no OpenCode part IDs), and reset the running cost/token totals plus the first-text flag.
Why reset totals: step-finish events that produced the pre-crash totals are not in the recovery payload; keeping them would double-count when post-recovery step-finish events accumulate against the same counters.
Used only by the recovery path — during normal streaming, parts accrete via apply_* helpers and totals flow through step-finish.
134 135 136 137 138 139 140 141 142 |
# File 'lib/opencode/reply.rb', line 134 def replace_parts(recovered_parts) @parts = recovered_parts @part_index_by_id.clear @part_type_by_id.clear @total_cost = 0.0 @total_input_tokens = 0 @total_output_tokens = 0 @first_text_seen = false end |
#result ⇒ Object
The denormalized result once streaming completes, matching the shape jobs persist to the message table: full_text for :content, reasoning_text for :reasoning, tool_parts for :tool_calls_json, and parts_json for :parts_json.
193 194 195 |
# File 'lib/opencode/reply.rb', line 193 def result self.class.distill(@parts) end |
#sync_recovered_parts(recovered_parts) ⇒ Object
Bring the live reply up to a recovered/polled exchange snapshot and notify observers for new or changed parts. This is the streaming counterpart to replace_parts: when the SSE connection ends before OpenCode’s multi-message tool loop has produced final text, Turn polls the message exchange. Those recovered parts still need to hit Turbo as incremental append/update events, not only the final row replacement.
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/opencode/reply.rb', line 150 def sync_recovered_parts(recovered_parts) Array(recovered_parts).each_with_index do |part, index| next if @parts[index] == part part = deep_dup_part(part) if index < @parts.length @parts[index] = part notify_recovered_part_updated(part, index) else @parts << part notify(:part_added, part: part, index: index) notify_recovered_part_updated(part, index) end @first_text_seen ||= part["type"] == "text" && part["content"].present? end end |
#tool_count ⇒ Object
185 186 187 |
# File 'lib/opencode/reply.rb', line 185 def tool_count @parts.count { |p| p["type"] == "tool" } end |