Class: Opencode::Client
- Inherits:
-
Object
- Object
- Opencode::Client
- Defined in:
- lib/opencode/client.rb
Overview
HTTP client for OpenCode REST API. Thread safety: Each instance creates its own Net::HTTP connection. Do NOT share instances across threads. Create per-job.
Constant Summary collapse
- MAX_SSE_BUFFER =
1 MB — safety valve against pathological server responses
1_048_576- SSE_RECONNECT_DELAY =
0.1- TRANSIENT_SSE_ERRORS =
[ EOFError, IOError, Net::OpenTimeout, Net::ReadTimeout, Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE ].freeze
Instance Attribute Summary collapse
-
#directory ⇒ Object
readonly
Returns the value of attribute directory.
Instance Method Summary collapse
- #abort_session(session_id) ⇒ Object
- #children(session_id) ⇒ Object
- #close ⇒ Object
- #create_session(title: nil, permissions: nil) ⇒ Object
- #delete_session(session_id) ⇒ Object
- #get_messages(session_id) ⇒ Object
- #health ⇒ Object
-
#initialize(base_url: ENV["OPENCODE_BASE_URL"] || "http://localhost:4096", password: ENV["OPENCODE_SERVER_PASSWORD"], timeout: (ENV["OPENCODE_TIMEOUT"] || 120).to_i, directory: nil, workspace: nil) ⇒ Client
constructor
A new instance of Client.
-
#list_questions ⇒ Object
Returns pending question requests as an Array of Hashes with SYMBOL keys, consistent with every other endpoint that flows through handle_response (e.g., health, list_sessions, get_messages).
- #list_sessions ⇒ Object
- #reject_question(request_id:) ⇒ Object
- #reply_permission(request_id:, reply:, message: nil) ⇒ Object
- #reply_question(request_id:, answers:) ⇒ Object
- #send_message(session_id, text, parts: nil, model: nil, agent: nil, system: nil, message_id: nil, no_reply: nil, tools: nil, format: nil, variant: nil) ⇒ Object
- #send_message_async(session_id, text, parts: nil, model: nil, agent: nil, system: nil, message_id: nil, no_reply: nil, tools: nil, format: nil, variant: nil) ⇒ Object
- #session_status ⇒ Object
-
#stream(session_id, text, model: nil, agent: nil, system: nil, message_id: nil, stream_timeout: 600, first_event_timeout: 120, idle_stream_timeout: nil, on_activity_tick: nil, &block) ⇒ Object
Block-form streaming — the headline API for callers who want the full async-prompt + SSE-loop + final-exchange-merge flow in one call.
-
#stream_events(session_id:, timeout: 600, first_event_timeout: 120, idle_stream_timeout: nil, reply: nil, on_activity_tick: nil, &block) ⇒ Object
Opens SSE connection to GET /event, yields parsed events filtered by session_id.
Constructor Details
#initialize(base_url: ENV["OPENCODE_BASE_URL"] || "http://localhost:4096", password: ENV["OPENCODE_SERVER_PASSWORD"], timeout: (ENV["OPENCODE_TIMEOUT"] || 120).to_i, directory: nil, workspace: nil) ⇒ Client
Returns a new instance of Client.
14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/opencode/client.rb', line 14 def initialize( base_url: ENV["OPENCODE_BASE_URL"] || "http://localhost:4096", password: ENV["OPENCODE_SERVER_PASSWORD"], timeout: (ENV["OPENCODE_TIMEOUT"] || 120).to_i, directory: nil, workspace: nil ) @uri = URI.parse(base_url) @password = password @timeout = timeout || 120 @directory = directory @workspace = workspace end |
Instance Attribute Details
#directory ⇒ Object (readonly)
Returns the value of attribute directory.
12 13 14 |
# File 'lib/opencode/client.rb', line 12 def directory @directory end |
Instance Method Details
#abort_session(session_id) ⇒ Object
171 172 173 |
# File 'lib/opencode/client.rb', line 171 def abort_session(session_id) post("/session/#{session_id}/abort", {}) end |
#children(session_id) ⇒ Object
147 148 149 150 151 |
# File 'lib/opencode/client.rb', line 147 def children(session_id) uri = build_uri("/session/#{session_id}/children") request = Net::HTTP::Get.new(uri) execute(request) end |
#close ⇒ Object
367 368 369 370 371 |
# File 'lib/opencode/client.rb', line 367 def close @http&.finish if @http&.started? rescue IOError # already closed end |
#create_session(title: nil, permissions: nil) ⇒ Object
28 29 30 31 |
# File 'lib/opencode/client.rb', line 28 def create_session(title: nil, permissions: nil) body = { title: title, permission: }.compact post("/session", body) end |
#delete_session(session_id) ⇒ Object
153 154 155 156 157 |
# File 'lib/opencode/client.rb', line 153 def delete_session(session_id) uri = build_uri("/session/#{session_id}") request = Net::HTTP::Delete.new(uri) execute(request) end |
#get_messages(session_id) ⇒ Object
165 166 167 168 169 |
# File 'lib/opencode/client.rb', line 165 def (session_id) uri = build_uri("/session/#{session_id}/message") request = Net::HTTP::Get.new(uri) execute(request) end |
#health ⇒ Object
217 218 219 220 221 |
# File 'lib/opencode/client.rb', line 217 def health uri = build_uri("/global/health", scoped: false) request = Net::HTTP::Get.new(uri) execute(request) end |
#list_questions ⇒ Object
Returns pending question requests as an Array of Hashes with SYMBOL keys, consistent with every other endpoint that flows through handle_response (e.g., health, list_sessions, get_messages). Callers that compare against persisted JSON column data should symbolize their side, not desymbolize this side.
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/opencode/client.rb', line 194 def list_questions uri = build_uri("/question") request = Net::HTTP::Get.new(uri) add_auth_header(request) response = Opencode::Instrumentation.instrument("opencode.request", method: request.method, path: request.path) do http_client.request(request) end unless response.code.to_i.between?(200, 299) raise ServerError, "list_questions failed: HTTP #{response.code} — #{response.body.to_s[0, 200]}" end return [] if response.body.blank? JSON.parse(response.body, symbolize_names: true) rescue JSON::ParserError => e raise ServerError, "list_questions returned invalid JSON: #{e.}" rescue Net::OpenTimeout, Net::ReadTimeout, Net::WriteTimeout => e raise TimeoutError, "OpenCode timeout after #{@timeout}s: #{e.}" rescue Errno::ECONNREFUSED, SocketError => e raise ConnectionError, "OpenCode unreachable: #{e.}" end |
#list_sessions ⇒ Object
141 142 143 144 145 |
# File 'lib/opencode/client.rb', line 141 def list_sessions uri = build_uri("/session") request = Net::HTTP::Get.new(uri) execute(request) end |
#reject_question(request_id:) ⇒ Object
179 180 181 |
# File 'lib/opencode/client.rb', line 179 def reject_question(request_id:) post("/question/#{request_id}/reject", {}) end |
#reply_permission(request_id:, reply:, message: nil) ⇒ Object
183 184 185 186 187 |
# File 'lib/opencode/client.rb', line 183 def (request_id:, reply:, message: nil) body = { reply: reply } body[:message] = if .present? post("/permission/#{request_id}/reply", body) end |
#reply_question(request_id:, answers:) ⇒ Object
175 176 177 |
# File 'lib/opencode/client.rb', line 175 def reply_question(request_id:, answers:) post("/question/#{request_id}/reply", { answers: answers }) end |
#send_message(session_id, text, parts: nil, model: nil, agent: nil, system: nil, message_id: nil, no_reply: nil, tools: nil, format: nil, variant: nil) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/opencode/client.rb', line 33 def ( session_id, text, parts: nil, model: nil, agent: nil, system: nil, message_id: nil, no_reply: nil, tools: nil, format: nil, variant: nil ) body = prompt_payload( text, parts: parts, model: model, agent: agent, system: system, message_id: , no_reply: no_reply, tools: tools, format: format, variant: variant ) post("/session/#{session_id}/message", body) end |
#send_message_async(session_id, text, parts: nil, model: nil, agent: nil, system: nil, message_id: nil, no_reply: nil, tools: nil, format: nil, variant: nil) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/opencode/client.rb', line 60 def ( session_id, text, parts: nil, model: nil, agent: nil, system: nil, message_id: nil, no_reply: nil, tools: nil, format: nil, variant: nil ) body = prompt_payload( text, parts: parts, model: model, agent: agent, system: system, message_id: , no_reply: no_reply, tools: tools, format: format, variant: variant ) post("/session/#{session_id}/prompt_async", body) end |
#session_status ⇒ Object
159 160 161 162 163 |
# File 'lib/opencode/client.rb', line 159 def session_status uri = build_uri("/session/status") request = Net::HTTP::Get.new(uri) execute(request) end |
#stream(session_id, text, model: nil, agent: nil, system: nil, message_id: nil, stream_timeout: 600, first_event_timeout: 120, idle_stream_timeout: nil, on_activity_tick: nil, &block) ⇒ Object
Block-form streaming — the headline API for callers who want the full async-prompt + SSE-loop + final-exchange-merge flow in one call. Returns the final Opencode::Reply::Result value object once the agent finishes.
reply = client.stream(session_id, "Explain monads") do |part|
print part["content"] if part["type"] == "text"
end
reply.full_text # => the final accumulated text
reply.tool_parts # => array of terminal tool parts
The block is invoked every time a part is added, grows, finalizes, or (for tool parts) advances state — i.e., whenever a user-visible change happens. The block receives the current ‘part` hash (string keys: “type”, “content”, “tool”, “status”, “input”, …).
If you need raw events (every server.* tick, todo.updated, prompt asked/replied, etc.), use #stream_events instead.
Optional kwargs are forwarded to send_message_async — model, agent, system prompt override, and the SSE pacing knobs supported by stream_events.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/opencode/client.rb', line 109 def stream( session_id, text, model: nil, agent: nil, system: nil, message_id: nil, stream_timeout: 600, first_event_timeout: 120, idle_stream_timeout: nil, on_activity_tick: nil, &block ) ( session_id, text, model: model, agent: agent, system: system, message_id: ) reply = Opencode::Reply.new reply.add_observer(StreamBlockObserver.new(&block)) if block_given? stream_events( session_id: session_id, timeout: stream_timeout, first_event_timeout: first_event_timeout, idle_stream_timeout: idle_stream_timeout, reply: reply, on_activity_tick: on_activity_tick ) do |event| reply.apply(event) end merge_final_exchange(session_id, reply) reply.result end |
#stream_events(session_id:, timeout: 600, first_event_timeout: 120, idle_stream_timeout: nil, reply: nil, on_activity_tick: nil, &block) ⇒ Object
Opens SSE connection to GET /event, yields parsed events filtered by session_id. Blocks until session goes idle or timeout, reconnecting across dropped event-stream connections.
first_event_timeout: seconds to wait for a session-specific event before declaring the session stale. Server heartbeats don’t count — they’re global keep-alives that flow regardless of session state.
Default 120s rather than the more aggressive 30s used originally: slow-thinking reasoning models (Kimi K2, GPT-5 with extended thinking, etc.) routinely spend 30-90s of pure reasoning before emitting their first ‘message.part.*` event, especially on cold sessions with long system prompts. 30s false-positive trips on legitimate first turns and converts them to `StaleSessionError`; 120s catches genuine zombies without nuking real reasoning. Callers that know their agent is short-prompt + fast can pass a lower value.
idle_stream_timeout: seconds to wait BETWEEN meaningful events once the session has started producing them. Default nil = no check (preserves the overall ‘timeout` ceiling behavior). Opt-in heartbeat watchdog for callers whose user-facing surface needs to fail fast rather than sit forever when an upstream LLM stream wedges mid-turn. Distinct from first_event_timeout (which only protects cold-start) and from the overall `timeout` ceiling of 600s (which is forgiving — a hung stream holding a thread for 10 minutes is already a bad UX). When the window is exceeded the call raises Opencode::IdleStreamError, which the caller is expected to catch and translate into a user-visible error / retry affordance.
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 |
# File 'lib/opencode/client.rb', line 263 def stream_events(session_id:, timeout: 600, first_event_timeout: 120, idle_stream_timeout: nil, reply: nil, on_activity_tick: nil, &block) uri = build_uri("/event") deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout first_event_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + first_event_timeout received_session_event = false last_meaningful_event_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) loop do now = Process.clock_gettime(Process::CLOCK_MONOTONIC) deadline = check_deadline_or_suspend(now, deadline, timeout, reply) # NOTE: first_event_deadline is *not* suspension-eligible. If the agent # never gets started we want to fail fast — a session that's blocked on # a prompt has, by definition, already produced events. if !received_session_event && now > first_event_deadline raise StaleSessionError, "No events for session #{session_id} within #{first_event_timeout}s" end if idle_stream_timeout && received_session_event && (now - last_meaningful_event_at) > idle_stream_timeout raise IdleStreamError, "No meaningful events for session #{session_id} within #{idle_stream_timeout}s " \ "(SSE heartbeats still arriving — upstream likely wedged mid-turn)" end request = Net::HTTP::Get.new(uri) request["Accept"] = "text/event-stream" request["Cache-Control"] = "no-cache" add_auth_header(request) http = Net::HTTP.new(@uri.host, @uri.port) http.use_ssl = @uri.scheme == "https" http.open_timeout = 10 http.read_timeout = 30 begin buffer = String.new http.request(request) do |response| unless response.is_a?(Net::HTTPSuccess) raise ServerError, "SSE connection failed: HTTP #{response.code}" end response.read_body do |chunk| now = Process.clock_gettime(Process::CLOCK_MONOTONIC) deadline = check_deadline_or_suspend(now, deadline, timeout, reply) if !received_session_event && now > first_event_deadline raise StaleSessionError, "No events for session #{session_id} within #{first_event_timeout}s" end if idle_stream_timeout && received_session_event && (now - last_meaningful_event_at) > idle_stream_timeout raise IdleStreamError, "No meaningful events for session #{session_id} within #{idle_stream_timeout}s " \ "(SSE heartbeats still arriving — upstream likely wedged mid-turn)" end buffer << chunk if buffer.bytesize > MAX_SSE_BUFFER raise ServerError, "SSE buffer exceeded #{MAX_SSE_BUFFER} bytes" end while (idx = buffer.index("\n\n")) raw_event = buffer.slice!(0, idx + 2) event = parse_sse_event(raw_event, session_id) next unless event unless event[:type]&.start_with?("server.") received_session_event = true last_meaningful_event_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) end # Tick activity on EVERY event, including server.heartbeat — # that's the whole point: a healthy long wait (user thinking # for 30 minutes) keeps the container warm via heartbeats so # the reaper doesn't kill it mid-wait. on_activity_tick&.call(event) block.call(event) return if event[:type] == "session.idle" end end end rescue *TRANSIENT_SSE_ERRORS # Treat transport-level SSE disconnects like clean EOF: reconnect # until session.idle, the overall timeout, or first-event timeout. ensure begin http&.finish if http&.started? rescue IOError # Connection already closed — network partition or server shutdown end end cutoff = received_session_event ? deadline : first_event_deadline sleep_for = [ SSE_RECONNECT_DELAY, cutoff - Process.clock_gettime(Process::CLOCK_MONOTONIC) ].min if sleep_for.positive? sleep sleep_for end end end |