Class: Opencode::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/opencode/client.rb

Overview

HTTP client for OpenCode REST API. Thread safety: Each instance creates its own Net::HTTP connection. Do NOT share instances across threads. Create per-job.

Constant Summary collapse

MAX_SSE_BUFFER =

1 MB — safety valve against pathological server responses

1_048_576
SSE_RECONNECT_DELAY =
0.1
TRANSIENT_SSE_ERRORS =
[
  EOFError,
  IOError,
  Net::OpenTimeout,
  Net::ReadTimeout,
  Errno::ECONNREFUSED,
  Errno::ECONNRESET,
  Errno::EPIPE
].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(base_url: ENV["OPENCODE_BASE_URL"] || "http://localhost:4096", password: ENV["OPENCODE_SERVER_PASSWORD"], timeout: (ENV["OPENCODE_TIMEOUT"] || 120).to_i, directory: nil, workspace: nil) ⇒ Client

Returns a new instance of Client.



14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/opencode/client.rb', line 14

def initialize(
  base_url: ENV["OPENCODE_BASE_URL"] || "http://localhost:4096",
  password: ENV["OPENCODE_SERVER_PASSWORD"],
  timeout: (ENV["OPENCODE_TIMEOUT"] || 120).to_i,
  directory: nil,
  workspace: nil
)
  @uri = URI.parse(base_url)
  @password = password
  @timeout = timeout || 120
  @directory = directory
  @workspace = workspace
end

Instance Attribute Details

#directoryObject (readonly)

Returns the value of attribute directory.



12
13
14
# File 'lib/opencode/client.rb', line 12

def directory
  @directory
end

Instance Method Details

#abort_session(session_id) ⇒ Object



171
172
173
# File 'lib/opencode/client.rb', line 171

def abort_session(session_id)
  post("/session/#{session_id}/abort", {})
end

#children(session_id) ⇒ Object



147
148
149
150
151
# File 'lib/opencode/client.rb', line 147

def children(session_id)
  uri = build_uri("/session/#{session_id}/children")
  request = Net::HTTP::Get.new(uri)
  execute(request)
end

#closeObject



367
368
369
370
371
# File 'lib/opencode/client.rb', line 367

def close
  @http&.finish if @http&.started?
rescue IOError
  # already closed
end

#create_session(title: nil, permissions: nil) ⇒ Object



28
29
30
31
# File 'lib/opencode/client.rb', line 28

def create_session(title: nil, permissions: nil)
  body = { title: title, permission: permissions }.compact
  post("/session", body)
end

#delete_session(session_id) ⇒ Object



153
154
155
156
157
# File 'lib/opencode/client.rb', line 153

def delete_session(session_id)
  uri = build_uri("/session/#{session_id}")
  request = Net::HTTP::Delete.new(uri)
  execute(request)
end

#get_messages(session_id) ⇒ Object



165
166
167
168
169
# File 'lib/opencode/client.rb', line 165

def get_messages(session_id)
  uri = build_uri("/session/#{session_id}/message")
  request = Net::HTTP::Get.new(uri)
  execute(request)
end

#healthObject



217
218
219
220
221
# File 'lib/opencode/client.rb', line 217

def health
  uri = build_uri("/global/health", scoped: false)
  request = Net::HTTP::Get.new(uri)
  execute(request)
end

#list_questionsObject

Returns pending question requests as an Array of Hashes with SYMBOL keys, consistent with every other endpoint that flows through handle_response (e.g., health, list_sessions, get_messages). Callers that compare against persisted JSON column data should symbolize their side, not desymbolize this side.



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/opencode/client.rb', line 194

def list_questions
  uri = build_uri("/question")
  request = Net::HTTP::Get.new(uri)
  add_auth_header(request)

  response = Opencode::Instrumentation.instrument("opencode.request", method: request.method, path: request.path) do
    http_client.request(request)
  end

  unless response.code.to_i.between?(200, 299)
    raise ServerError, "list_questions failed: HTTP #{response.code}#{response.body.to_s[0, 200]}"
  end

  return [] if response.body.blank?
  JSON.parse(response.body, symbolize_names: true)
rescue JSON::ParserError => e
  raise ServerError, "list_questions returned invalid JSON: #{e.message}"
rescue Net::OpenTimeout, Net::ReadTimeout, Net::WriteTimeout => e
  raise TimeoutError, "OpenCode timeout after #{@timeout}s: #{e.message}"
rescue Errno::ECONNREFUSED, SocketError => e
  raise ConnectionError, "OpenCode unreachable: #{e.message}"
end

#list_sessionsObject



141
142
143
144
145
# File 'lib/opencode/client.rb', line 141

def list_sessions
  uri = build_uri("/session")
  request = Net::HTTP::Get.new(uri)
  execute(request)
end

#reject_question(request_id:) ⇒ Object



179
180
181
# File 'lib/opencode/client.rb', line 179

def reject_question(request_id:)
  post("/question/#{request_id}/reject", {})
end

#reply_permission(request_id:, reply:, message: nil) ⇒ Object



183
184
185
186
187
# File 'lib/opencode/client.rb', line 183

def reply_permission(request_id:, reply:, message: nil)
  body = { reply: reply }
  body[:message] = message if message.present?
  post("/permission/#{request_id}/reply", body)
end

#reply_question(request_id:, answers:) ⇒ Object



175
176
177
# File 'lib/opencode/client.rb', line 175

def reply_question(request_id:, answers:)
  post("/question/#{request_id}/reply", { answers: answers })
end

#send_message(session_id, text, parts: nil, model: nil, agent: nil, system: nil, message_id: nil, no_reply: nil, tools: nil, format: nil, variant: nil) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/opencode/client.rb', line 33

def send_message(
  session_id, text,
  parts: nil,
  model: nil,
  agent: nil,
  system: nil,
  message_id: nil,
  no_reply: nil,
  tools: nil,
  format: nil,
  variant: nil
)
  body = prompt_payload(
    text,
    parts: parts,
    model: model,
    agent: agent,
    system: system,
    message_id: message_id,
    no_reply: no_reply,
    tools: tools,
    format: format,
    variant: variant
  )
  post("/session/#{session_id}/message", body)
end

#send_message_async(session_id, text, parts: nil, model: nil, agent: nil, system: nil, message_id: nil, no_reply: nil, tools: nil, format: nil, variant: nil) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/opencode/client.rb', line 60

def send_message_async(
  session_id, text,
  parts: nil,
  model: nil,
  agent: nil,
  system: nil,
  message_id: nil,
  no_reply: nil,
  tools: nil,
  format: nil,
  variant: nil
)
  body = prompt_payload(
    text,
    parts: parts,
    model: model,
    agent: agent,
    system: system,
    message_id: message_id,
    no_reply: no_reply,
    tools: tools,
    format: format,
    variant: variant
  )
  post("/session/#{session_id}/prompt_async", body)
end

#session_statusObject



159
160
161
162
163
# File 'lib/opencode/client.rb', line 159

def session_status
  uri = build_uri("/session/status")
  request = Net::HTTP::Get.new(uri)
  execute(request)
end

#stream(session_id, text, model: nil, agent: nil, system: nil, message_id: nil, stream_timeout: 600, first_event_timeout: 120, idle_stream_timeout: nil, on_activity_tick: nil, &block) ⇒ Object

Block-form streaming — the headline API for callers who want the full async-prompt + SSE-loop + final-exchange-merge flow in one call. Returns the final Opencode::Reply::Result value object once the agent finishes.

reply = client.stream(session_id, "Explain monads") do |part|
  print part["content"] if part["type"] == "text"
end
reply.full_text   # => the final accumulated text
reply.tool_parts  # => array of terminal tool parts

The block is invoked every time a part is added, grows, finalizes, or (for tool parts) advances state — i.e., whenever a user-visible change happens. The block receives the current ‘part` hash (string keys: “type”, “content”, “tool”, “status”, “input”, …).

If you need raw events (every server.* tick, todo.updated, prompt asked/replied, etc.), use #stream_events instead.

Optional kwargs are forwarded to send_message_async — model, agent, system prompt override, and the SSE pacing knobs supported by stream_events.



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/opencode/client.rb', line 109

def stream(
  session_id, text,
  model: nil, agent: nil, system: nil, message_id: nil,
  stream_timeout: 600,
  first_event_timeout: 120,
  idle_stream_timeout: nil,
  on_activity_tick: nil,
  &block
)
  send_message_async(
    session_id, text,
    model: model, agent: agent, system: system, message_id: message_id
  )

  reply = Opencode::Reply.new
  reply.add_observer(StreamBlockObserver.new(&block)) if block_given?

  stream_events(
    session_id: session_id,
    timeout: stream_timeout,
    first_event_timeout: first_event_timeout,
    idle_stream_timeout: idle_stream_timeout,
    reply: reply,
    on_activity_tick: on_activity_tick
  ) do |event|
    reply.apply(event)
  end

  merge_final_exchange(session_id, reply)
  reply.result
end

#stream_events(session_id:, timeout: 600, first_event_timeout: 120, idle_stream_timeout: nil, reply: nil, on_activity_tick: nil, &block) ⇒ Object

Opens SSE connection to GET /event, yields parsed events filtered by session_id. Blocks until session goes idle or timeout, reconnecting across dropped event-stream connections.

first_event_timeout: seconds to wait for a session-specific event before declaring the session stale. Server heartbeats don’t count — they’re global keep-alives that flow regardless of session state.

Default 120s rather than the more aggressive 30s used originally: slow-thinking reasoning models (Kimi K2, GPT-5 with extended thinking, etc.) routinely spend 30-90s of pure reasoning before emitting their first ‘message.part.*` event, especially on cold sessions with long system prompts. 30s false-positive trips on legitimate first turns and converts them to `StaleSessionError`; 120s catches genuine zombies without nuking real reasoning. Callers that know their agent is short-prompt + fast can pass a lower value.

idle_stream_timeout: seconds to wait BETWEEN meaningful events once the session has started producing them. Default nil = no check (preserves the overall ‘timeout` ceiling behavior). Opt-in heartbeat watchdog for callers whose user-facing surface needs to fail fast rather than sit forever when an upstream LLM stream wedges mid-turn. Distinct from first_event_timeout (which only protects cold-start) and from the overall `timeout` ceiling of 600s (which is forgiving — a hung stream holding a thread for 10 minutes is already a bad UX). When the window is exceeded the call raises Opencode::IdleStreamError, which the caller is expected to catch and translate into a user-visible error / retry affordance.



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
# File 'lib/opencode/client.rb', line 263

def stream_events(session_id:, timeout: 600, first_event_timeout: 120,
                   idle_stream_timeout: nil,
                   reply: nil, on_activity_tick: nil, &block)
  uri = build_uri("/event")
  deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
  first_event_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + first_event_timeout
  received_session_event = false
  last_meaningful_event_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  loop do
    now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    deadline = check_deadline_or_suspend(now, deadline, timeout, reply)

    # NOTE: first_event_deadline is *not* suspension-eligible. If the agent
    # never gets started we want to fail fast — a session that's blocked on
    # a prompt has, by definition, already produced events.
    if !received_session_event && now > first_event_deadline
      raise StaleSessionError, "No events for session #{session_id} within #{first_event_timeout}s"
    end

    if idle_stream_timeout && received_session_event &&
       (now - last_meaningful_event_at) > idle_stream_timeout
      raise IdleStreamError,
            "No meaningful events for session #{session_id} within #{idle_stream_timeout}s " \
            "(SSE heartbeats still arriving — upstream likely wedged mid-turn)"
    end

    request = Net::HTTP::Get.new(uri)
    request["Accept"] = "text/event-stream"
    request["Cache-Control"] = "no-cache"
    add_auth_header(request)

    http = Net::HTTP.new(@uri.host, @uri.port)
    http.use_ssl = @uri.scheme == "https"
    http.open_timeout = 10
    http.read_timeout = 30

    begin
      buffer = String.new

      http.request(request) do |response|
        unless response.is_a?(Net::HTTPSuccess)
          raise ServerError, "SSE connection failed: HTTP #{response.code}"
        end

        response.read_body do |chunk|
          now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
          deadline = check_deadline_or_suspend(now, deadline, timeout, reply)

          if !received_session_event && now > first_event_deadline
            raise StaleSessionError, "No events for session #{session_id} within #{first_event_timeout}s"
          end

          if idle_stream_timeout && received_session_event &&
             (now - last_meaningful_event_at) > idle_stream_timeout
            raise IdleStreamError,
                  "No meaningful events for session #{session_id} within #{idle_stream_timeout}s " \
                  "(SSE heartbeats still arriving — upstream likely wedged mid-turn)"
          end

          buffer << chunk
          if buffer.bytesize > MAX_SSE_BUFFER
            raise ServerError, "SSE buffer exceeded #{MAX_SSE_BUFFER} bytes"
          end

          while (idx = buffer.index("\n\n"))
            raw_event = buffer.slice!(0, idx + 2)
            event = parse_sse_event(raw_event, session_id)
            next unless event

            unless event[:type]&.start_with?("server.")
              received_session_event = true
              last_meaningful_event_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
            end

            # Tick activity on EVERY event, including server.heartbeat —
            # that's the whole point: a healthy long wait (user thinking
            # for 30 minutes) keeps the container warm via heartbeats so
            # the reaper doesn't kill it mid-wait.
            on_activity_tick&.call(event)
            block.call(event)
            return if event[:type] == "session.idle"
          end
        end
      end
    rescue *TRANSIENT_SSE_ERRORS
      # Treat transport-level SSE disconnects like clean EOF: reconnect
      # until session.idle, the overall timeout, or first-event timeout.
    ensure
      begin
        http&.finish if http&.started?
      rescue IOError
        # Connection already closed — network partition or server shutdown
      end
    end

    cutoff = received_session_event ? deadline : first_event_deadline
    sleep_for = [ SSE_RECONNECT_DELAY, cutoff - Process.clock_gettime(Process::CLOCK_MONOTONIC) ].min
    if sleep_for.positive?
      sleep sleep_for
    end
  end
end