Class: Opencode::Reply

Inherits:
Object
  • Object
show all
Defined in:
lib/opencode/reply.rb

Overview

An assistant’s reply as it is being composed, live, from OpenCode SSE events. A Reply accumulates parts (text, reasoning, tool invocations) in the order the agent emits them and notifies observers of domain transitions — parts appearing, parts growing, tools advancing, sessions erroring.

Responsibilities


  • Translate raw OpenCode SSE events into domain callbacks.

  • Own the canonical state of an in-flight reply (parts list, indices, first-token seen, message info).

  • Apply the tail-drop safety net: when part.updated carries authoritative :text that differs from what deltas accumulated (z.ai GLM-5.1 drops trailing deltas), rewrite the part’s content.

  • Preserve the original tool name when OpenCode later renames a tool to “invalid” mid-stream.

Not responsibilities


  • Rendering HTML or broadcasting Turbo Streams (observer concern).

  • Persisting parts to a database (observer concern).

  • Fetching the event stream (Opencode::Client).

  • Retry / session recovery (job concern).

Event contract


Events match OpenCode’s bus schema (packages/opencode/src/session/ message-v2.ts, status.ts, todo.ts):

message.part.delta    { properties: { partID, field, delta, ... } }
message.part.updated  { properties: { part: { id, type, ... } } }
message.updated       { properties: { info: { tokens, cost, ... } } }
session.status        { properties: { status: { type, ... } } }
session.error         { properties: { error: { name, data, ... } } }
todo.updated          { properties: { todos: [...] } }

Observer callbacks


See Opencode::ReplyObserver for the full callback surface. Observers are duck-typed — only the callbacks they define are invoked.

Example


reply = Opencode::Reply.new
reply.add_observer(MyApp::ReplyStream.new(message:))   # your observer
client.stream_events(session_id: id) { |event| reply.apply(event) }
reply.result
# => Opencode::Reply::Result with parts_json, full_text, reasoning_text, tool_parts

Defined Under Namespace

Classes: Result

Constant Summary collapse

STREAMABLE_TYPES =
%w[text reasoning tool].freeze
TERMINAL_TOOL_STATUSES =
%w[completed error].freeze
TODO_TOOLS =
%w[todowrite todoread].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeReply

Returns a new instance of Reply.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/opencode/reply.rb', line 73

def initialize
  @parts = []
  @part_index_by_id = {}
  @part_type_by_id = {}
  @observers = []
  @first_text_seen = false
  @info = nil
  @total_cost = 0.0
  @total_input_tokens = 0
  @total_output_tokens = 0
  @todo_part_index = nil
  @prompts = Opencode::Prompts.new
  # Keyed by [message_id, call_id]: question.asked payloads that
  # arrived before their matching tool part. Drained when the tool
  # part shows up in apply_tool_state.
  @pending_question_payloads = {}
end

Instance Attribute Details

#infoObject (readonly)

Returns the value of attribute info.



71
72
73
# File 'lib/opencode/reply.rb', line 71

def info
  @info
end

#partsObject (readonly)

Returns the value of attribute parts.



71
72
73
# File 'lib/opencode/reply.rb', line 71

def parts
  @parts
end

#promptsObject (readonly)

Returns the value of attribute prompts.



71
72
73
# File 'lib/opencode/reply.rb', line 71

def prompts
  @prompts
end

#total_costObject (readonly)

Returns the value of attribute total_cost.



71
72
73
# File 'lib/opencode/reply.rb', line 71

def total_cost
  @total_cost
end

#total_input_tokensObject (readonly)

Returns the value of attribute total_input_tokens.



71
72
73
# File 'lib/opencode/reply.rb', line 71

def total_input_tokens
  @total_input_tokens
end

#total_output_tokensObject (readonly)

Returns the value of attribute total_output_tokens.



71
72
73
# File 'lib/opencode/reply.rb', line 71

def total_output_tokens
  @total_output_tokens
end

Class Method Details

.distill(parts) ⇒ Object

Pure function: given a parts array, return the denormalized result as an Opencode::Reply::Result value object. Exposed so a recovery path (fetch messages from the session API and map them through ResponseParser.extract_interleaved_parts) produces the same shape as live streaming.



202
203
204
205
206
207
208
209
# File 'lib/opencode/reply.rb', line 202

def self.distill(parts)
  Result.new(
    parts_json: parts,
    full_text: join_content(parts, "text"),
    reasoning_text: join_content(parts, "reasoning"),
    tool_parts: parts.select { |p| p["type"] == "tool" && TERMINAL_TOOL_STATUSES.include?(p["status"]) }
  )
end

Instance Method Details

#add_observer(observer) ⇒ Object



99
100
101
102
# File 'lib/opencode/reply.rb', line 99

def add_observer(observer)
  @observers << observer
  self
end

#apply(event) ⇒ Object

Drive the state machine forward with one SSE event. Unknown event types are ignored — OpenCode may add new events, and we shouldn’t crash on them.



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/opencode/reply.rb', line 107

def apply(event)
  case event[:type]
  when "message.part.delta"   then apply_part_delta(event)
  when "message.part.updated" then apply_part_updated(event)
  when "message.updated"      then apply_message_updated(event)
  when "session.status"       then apply_session_status(event)
  when "session.error"        then apply_session_error(event)
  when "todo.updated"         then apply_todo_updated(event)
  when "question.asked"       then apply_question_asked(event)
  when "question.replied"     then apply_question_replied(event)
  when "question.rejected"    then apply_question_rejected(event)
  when "permission.asked"     then apply_permission_asked(event)
  when "permission.replied"   then apply_permission_replied(event)
  end
end

#first_text_seen?Boolean

Returns:

  • (Boolean)


181
182
183
# File 'lib/opencode/reply.rb', line 181

def first_text_seen?
  @first_text_seen
end

#inject_part(part_hash) ⇒ Object

Record a part that originated OUTSIDE the OpenCode event stream —used when an observer synthesizes a part (e.g., a session error notice) that isn’t a real message.part.* event but should still appear in the persisted parts_json. Returns the new index.

Does NOT fire part_added — the injecting observer has already done whatever rendering it needed. Other observers can poll ‘parts` if they care about injected content.



176
177
178
179
# File 'lib/opencode/reply.rb', line 176

def inject_part(part_hash)
  @parts << part_hash
  @parts.size - 1
end

#prompt_blocked?Boolean

True while any interactive prompt (question or permission) is awaiting a user reply. Opencode::Client uses this to suspend the SSE inactivity deadline — a wait on the human is healthy, not a hang.

Returns:

  • (Boolean)


95
96
97
# File 'lib/opencode/reply.rb', line 95

def prompt_blocked?
  @prompts.prompt_blocked?
end

#replace_parts(recovered_parts) ⇒ Object

Treat ‘recovered_parts` as a clean-slate baseline: replace parts, clear the id→index map (recovered parts have no OpenCode part IDs), and reset the running cost/token totals plus the first-text flag.

Why reset totals: step-finish events that produced the pre-crash totals are not in the recovery payload; keeping them would double-count when post-recovery step-finish events accumulate against the same counters.

Used only by the recovery path — during normal streaming, parts accrete via apply_* helpers and totals flow through step-finish.



134
135
136
137
138
139
140
141
142
# File 'lib/opencode/reply.rb', line 134

def replace_parts(recovered_parts)
  @parts = recovered_parts
  @part_index_by_id.clear
  @part_type_by_id.clear
  @total_cost = 0.0
  @total_input_tokens = 0
  @total_output_tokens = 0
  @first_text_seen = false
end

#resultObject

The denormalized result once streaming completes, matching the shape jobs persist to the message table: full_text for :content, reasoning_text for :reasoning, tool_parts for :tool_calls_json, and parts_json for :parts_json.



193
194
195
# File 'lib/opencode/reply.rb', line 193

def result
  self.class.distill(@parts)
end

#sync_recovered_parts(recovered_parts) ⇒ Object

Bring the live reply up to a recovered/polled exchange snapshot and notify observers for new or changed parts. This is the streaming counterpart to replace_parts: when the SSE connection ends before OpenCode’s multi-message tool loop has produced final text, Turn polls the message exchange. Those recovered parts still need to hit Turbo as incremental append/update events, not only the final row replacement.



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/opencode/reply.rb', line 150

def sync_recovered_parts(recovered_parts)
  Array(recovered_parts).each_with_index do |part, index|
    next if @parts[index] == part

    part = deep_dup_part(part)
    if index < @parts.length
      @parts[index] = part
      notify_recovered_part_updated(part, index)
    else
      @parts << part
      notify(:part_added, part: part, index: index)
      notify_recovered_part_updated(part, index)
    end

    @first_text_seen ||= part["type"] == "text" && part["content"].present?
  end
end

#tool_countObject



185
186
187
# File 'lib/opencode/reply.rb', line 185

def tool_count
  @parts.count { |p| p["type"] == "tool" }
end