Class: ClaudeAgentSDK::SubprocessCLITransport

Inherits:
Transport
  • Object
show all
Defined in:
lib/claude_agent_sdk/subprocess_cli_transport.rb

Overview

Subprocess transport using Claude Code CLI

Constant Summary collapse

DEFAULT_MAX_BUFFER_SIZE =

1MB buffer limit

1024 * 1024
MINIMUM_CLAUDE_CODE_VERSION =
'2.0.0'
SKIP_VERSION_CHECK_ENV_VAR =
'CLAUDE_AGENT_SDK_SKIP_VERSION_CHECK'
VERSION_CHECK_TIMEOUT_SECONDS =

mirrors Python’s anyio.fail_after(2)

2
RECENT_STDERR_LINES_LIMIT =
20
ACTIVE_PROCESSES =

Track live CLI subprocesses so we can terminate them when the parent Ruby process exits. Mirrors the Python (PR #916, a ‘set`) and TypeScript SDKs’ parent-exit cleanup, preventing orphaned ‘claude` processes from leaking when callers crash or exit before reaching #close. A Set keyed by object identity (like Python’s set) keeps the hot path off ‘#pid` — only #kill_active_processes touches `#pid`/`#alive?`, at exit. Guarded by a mutex because #close can run on a FiberBoundary worker thread while #connect runs on the reactor fiber. Stored in CONSTANTS (not class instance variables) so the registry is a single shared instance across this class and any subclass: constants resolve through the ancestor chain, whereas class ivars are NOT inherited — a `SubprocessCLITransport` subclass instance calling `self.class.register_active_process` would otherwise reach a nil mutex and raise mid-#connect, orphaning the just-spawned child. The base-class at_exit handler must be able to see every subprocess, a subclass’s too.

Set.new
ACTIVE_PROCESSES_MUTEX =
Mutex.new

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options_or_prompt = nil, options = nil) ⇒ SubprocessCLITransport

Returns a new instance of SubprocessCLITransport.



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 90

def initialize(options_or_prompt = nil, options = nil)
  # Support both new single-arg form and legacy two-arg form
  @options = options.nil? ? options_or_prompt : options
  @cli_path = @options.cli_path || find_cli
  @cwd = @options.cwd
  @process = nil
  @stdin = nil
  @stdout = nil
  @stderr = nil
  @ready = false
  @exit_error = nil
  @max_buffer_size = @options.max_buffer_size || DEFAULT_MAX_BUFFER_SIZE
  @stderr_task = nil
  @recent_stderr = []
  @recent_stderr_mutex = Mutex.new
  # Serializes stdin access across the reactor fiber (transport writes
  # from inside Async) and user-callback threads spawned via FiberBoundary
  # (tool handlers / hooks calling Client#query). Without this lock,
  # close can nil @stdin between write's readiness check and the actual
  # @stdin.write call, producing NoMethodError on nil.
  @stdin_mutex = Mutex.new
end

Class Method Details

.active_processesObject

Public readers (the test suite uses ‘described_class.active_processes`); they return the shared constants so subclasses observe the same objects.



42
43
44
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 42

def active_processes
  ACTIVE_PROCESSES
end

.active_processes_mutexObject



46
47
48
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 46

def active_processes_mutex
  ACTIVE_PROCESSES_MUTEX
end

.deregister_active_process(wait_thr) ⇒ Object



57
58
59
60
61
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 57

def deregister_active_process(wait_thr)
  return unless wait_thr

  active_processes_mutex.synchronize { active_processes.delete(wait_thr) }
end

.kill_active_processesObject

Best-effort SIGTERM to every still-running child. Registered with at_exit at the bottom of this file. Never reaps (a blocking wait could hang interpreter shutdown) — the OS reparents and reaps orphans.

Deliberately does NOT take active_processes_mutex: at interpreter shutdown Ruby runs at_exit handlers before terminating other threads, and Mutex is unfair, so blocking here while a still-live worker churns register/deregister can starve this handler and hang the process. A lock-free read is safe — a torn snapshot at worst misses or repeats a SIGTERM, both harmless. The outer rescue guarantees the handler never raises (e.g. ThreadError if reached from a trap context, or a concurrent-modification error from the unlocked read), honoring the “never interrupt interpreter shutdown” contract.



76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 76

def kill_active_processes
  active_processes.to_a.each do |wait_thr|
    next unless wait_thr.alive?

    Process.kill('TERM', wait_thr.pid)
  rescue StandardError
    # Process already gone (Errno::ESRCH), not permitted, or invalid pid.
  end
  active_processes.clear
rescue StandardError
  # Never let cleanup interfere with interpreter shutdown.
end

.register_active_process(wait_thr) ⇒ Object

wait_thr is the Process::Waiter returned by Open3.popen3.



51
52
53
54
55
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 51

def register_active_process(wait_thr)
  return unless wait_thr

  active_processes_mutex.synchronize { active_processes.add(wait_thr) }
end

Instance Method Details

#build_commandObject



179
180
181
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 179

def build_command
  CommandBuilder.new(@cli_path, @options).build
end

#check_claude_versionObject



583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 583

def check_claude_version
  # Mirrors Python's os.environ.get truthiness: any non-empty value skips,
  # including '0'/'false'/' '; unset or empty string runs the check.
  skip = ENV.fetch(SKIP_VERSION_CHECK_ENV_VAR, nil)
  return if skip && !skip.empty?

  begin
    output = capture_cli_version_output
    # Residual divergence from Python (anchored re.match over the first
    # stdout chunk): this searches anywhere in stdout+stderr, so leading
    # noise (a shim's own version line) could be mistaken for the CLI
    # version. Pre-existing shape; the check is best-effort only.
    if match = output.match(/([0-9]+\.[0-9]+\.[0-9]+)/)
      version = match[1]
      version_parts = version.split('.').map(&:to_i)
      min_parts = MINIMUM_CLAUDE_CODE_VERSION.split('.').map(&:to_i)

      # Array has no #< — the old `version_parts < min_parts` raised
      # NoMethodError into the blanket rescue, so the warning never fired.
      if (version_parts <=> min_parts).negative?
        warning = "Warning: Claude Code version #{version} at #{@cli_path} is unsupported in the Agent SDK. " \
                  "Minimum required version is #{MINIMUM_CLAUDE_CODE_VERSION}. " \
                  "Some features may not work correctly."
        warn warning
      end
    end
  rescue StandardError
    # Ignore version check errors — including Timeout::Error from the
    # probe deadline, mirroring Python's `except Exception: pass`.
  end
end

#closeObject



327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 327

def close
  @ready = false
  return unless @process

  cleanup_errors = []

  # Kill stderr thread
  if @stderr_task&.alive?
    begin
      @stderr_task.kill
      @stderr_task.join(1)
    rescue StandardError => e
      cleanup_errors << "stderr thread: #{e.message}"
    end
  end

  # Close stdin under the same lock that guards write — otherwise a
  # concurrent writer (callbacks running on FiberBoundary threads) can
  # see @stdin nilled mid-write and hit NoMethodError on nil.
  @stdin_mutex.synchronize do
    begin
      @stdin&.close
    rescue IOError
      # Already closed, ignore
    rescue StandardError => e
      cleanup_errors << "stdin: #{e.message}"
    end
    @stdin = nil
  end

  begin
    @stdout&.close
  rescue IOError
    # Already closed, ignore
  rescue StandardError => e
    cleanup_errors << "stdout: #{e.message}"
  end

  begin
    @stderr&.close
  rescue IOError
    # Already closed, ignore
  rescue StandardError => e
    cleanup_errors << "stderr: #{e.message}"
  end

  # Wait for graceful shutdown after stdin EOF, then terminate if needed.
  # The subprocess needs time to flush its session file after receiving
  # EOF on stdin. Without this grace period, SIGTERM can interrupt the
  # write and cause the last assistant message to be lost.
  begin
    wait_process_with_timeout(5) if @process.alive?
  rescue Timeout::Error
    # Graceful shutdown timed out — send SIGTERM
    begin
      Process.kill('TERM', @process.pid)
      wait_process_with_timeout(2)
    rescue Timeout::Error
      # SIGTERM didn't work — force kill
      begin
        Process.kill('KILL', @process.pid)
        @process.value
      rescue StandardError => e
        cleanup_errors << "force kill: #{e.message}"
      end
    rescue Errno::ESRCH
      # Process already dead
    end
  rescue Errno::ESRCH
    # Process already dead, ignore
  rescue StandardError => e
    cleanup_errors << "process termination: #{e.message}"
  end

  # Log any cleanup errors (non-fatal)
  if cleanup_errors.any?
    warn "Claude SDK: Cleanup warnings: #{cleanup_errors.join(', ')}"
  end

  self.class.deregister_active_process(@process)
  @process = nil
  @stdout = nil
  # @stdin already nilled under the mutex above.
  @stderr = nil
  @stderr_task = nil
  @exit_error = nil
end

#connectObject



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 183

def connect
  return if @process

  check_claude_version

  cmd = build_command

  # Build environment
  # Convert symbol keys to strings for spawn compatibility
  custom_env = @options.env.transform_keys { |k| k.to_s }
  # Explicitly unset CLAUDECODE to prevent "nested session" detection when the SDK
  # launches Claude Code from within an existing Claude Code terminal.
  # NOTE: Must set to nil (not just omit the key) — Ruby's spawn only overlays
  # the env hash on top of the parent environment; a nil value actively unsets.
  # ENTRYPOINT defaults to sdk-rb regardless of inherited process env
  # (the old ||= let an inherited 'cli' win and mis-attribute telemetry);
  # options.env may still override it. VERSION is merged last: always
  # set by the SDK, never overridable (Python merge-order parity).
  process_env = ENV.to_h
                   .merge('CLAUDECODE' => nil, 'CLAUDE_CODE_ENTRYPOINT' => 'sdk-rb')
                   .merge(custom_env)
                   .merge('CLAUDE_AGENT_SDK_VERSION' => VERSION)
  # Propagate the active OTel trace context to the CLI so its spans parent
  # under the caller's distributed trace (Python SDK #821 parity). No-op
  # when opentelemetry is not loaded or there is no active span.
  inject_otel_trace_context(process_env, custom_env)
  process_env['CLAUDE_CODE_ENABLE_SDK_FILE_CHECKPOINTING'] = 'true' if @options.enable_file_checkpointing
  process_env['PWD'] = @cwd.to_s if @cwd

  # Determine stderr handling
  should_pipe_stderr = @options.stderr || @options.debug_stderr || @options.extra_args.key?('debug-to-stderr')

  begin
    # Start process using Open3
    # :uid mirrors Python's anyio.open_process(user=...): String username
    # or Integer uid (Unix; requires privileges — typically root). The
    # .compact is mandatory: uid: nil raises TypeError on every connect.
    # On Windows spawn raises for :uid, wrapped below into
    # CLIConnectionError — fail-loud instead of the old silent ignore.
    opts = { chdir: @cwd&.to_s, uid: @options.user }.compact

    @stdin, @stdout, @stderr, @process = Open3.popen3(process_env, *cmd, opts)
    # The CLI emits UTF-8 regardless of the parent locale. popen3 pipes
    # default to Encoding.default_external (US-ASCII under LANG=C/LC_ALL=C
    # — minimal Docker images, systemd, CI), which makes String#strip on
    # multibyte CLI output raise Encoding::CompatibilityError and kill the
    # read loop (its rescue only catches IOError). Mirrors the Python
    # SDK's TextReceiveStream(stdout), which always decodes UTF-8.
    @stdout&.set_encoding(Encoding::UTF_8)
    @stderr&.set_encoding(Encoding::UTF_8)
    self.class.register_active_process(@process)

    # Always drain stderr to prevent pipe buffer deadlock.
    # Without this, --verbose output fills the OS pipe buffer (~64KB),
    # the subprocess blocks on write, and all pipes stall → EPIPE.
    if @stderr
      if should_pipe_stderr # rubocop:disable Style/ConditionalAssignment
        @stderr_task = Thread.new do
          handle_stderr
        rescue StandardError
          # Ignore errors during stderr reading
        end
      else
        # Silently drain stderr so the subprocess never blocks,
        # but still accumulate recent lines for error reporting.
        @stderr_task = Thread.new do
          drain_stderr_with_accumulation
        rescue StandardError
          # Ignore — process may have already exited
        end
      end
    end

    # Always keep stdin open — streaming mode uses it for the control protocol
    @ready = true
  rescue Errno::ENOENT => e
    # Check if error is from cwd or CLI
    if @cwd && !File.directory?(@cwd.to_s)
      error = CLIConnectionError.new("Working directory does not exist: #{@cwd}")
      @exit_error = error
      raise error
    end
    error = CLINotFoundError.new("Claude Code not found at: #{@cli_path}")
    @exit_error = error
    raise error
  rescue StandardError, NotImplementedError => e
    # NotImplementedError < ScriptError, not StandardError (the trap this
    # repo keeps hitting): spawn raises it for :uid on platforms without
    # setuid (Windows), and it must wrap like every other spawn failure.
    error = CLIConnectionError.new("Failed to start Claude Code: #{e}")
    @exit_error = error
    raise error
  end
end

#drain_stderr_with_accumulationObject



316
317
318
319
320
321
322
323
324
325
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 316

def drain_stderr_with_accumulation
  return unless @stderr

  @stderr.each_line("\n", @max_buffer_size + 1) do |line|
    line_str = line.chomp
    next if line_str.empty?

    record_bounded_stderr(line_str)
  end
end

#end_inputObject



461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 461

def end_input
  # Under @stdin_mutex like write/close (the transport's documented
  # locking protocol; Python's end_input takes _write_lock too). The
  # nil-guard must live INSIDE the critical section or the TOCTOU
  # returns. NOTE: non-reentrant — close() inlines its own stdin
  # handling and must never delegate here.
  @stdin_mutex.synchronize do
    return unless @stdin

    begin
      @stdin.close
    rescue StandardError
      # Ignore
    end
    @stdin = nil
  end
end

#find_cliObject

Raises:



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 113

def find_cli
  # Try which command first (using Open3 for thread safety)
  cli = nil
  begin
    stdout, _status = Open3.capture2('which', 'claude')
    cli = stdout.strip
  rescue StandardError
    # which command failed, try common locations
  end
  return cli if cli && !cli.empty? && File.executable?(cli)

  # Try common locations
  locations = [
    File.join(Dir.home, '.claude/local/claude'),  # Claude Code default install location
    File.join(Dir.home, '.npm-global/bin/claude'),
    '/usr/local/bin/claude',
    File.join(Dir.home, '.local/bin/claude'),
    File.join(Dir.home, 'node_modules/.bin/claude'),
    File.join(Dir.home, '.yarn/bin/claude')
  ]

  locations.each do |path|
    return path if File.exist?(path) && File.file?(path)
  end

  raise CLINotFoundError.new(
    "Claude Code not found. Install with:\n" \
    "  npm install -g @anthropic-ai/claude-code\n" \
    "\nIf already installed locally, try:\n" \
    '  export PATH="$HOME/node_modules/.bin:$PATH"' \
    "\n\nOr provide the path via ClaudeAgentOptions:\n" \
    "  ClaudeAgentOptions.new(cli_path: '/path/to/claude')"
  )
end

#handle_stderrObject



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 278

def handle_stderr
  return unless @stderr

  @stderr.each_line("\n", @max_buffer_size + 1) do |line|
    line_str = line.chomp
    next if line_str.empty?

    record_bounded_stderr(line_str)

    # Per-line isolation: a callback that raises (e.g. user's logger
    # transiently failing) must not poison the rest of the stderr stream.
    # Without this, the first exception terminates the each_line loop and
    # the SDK silently stops capturing stderr for the lifetime of the
    # process. Matches Python SDK v0.2.82 (PR #932).
    begin
      @options.stderr&.call(line_str)
    rescue StandardError
      # Drop the callback error; the line is already in the recent-stderr
      # ring buffer, which is what ProcessError surfaces on non-zero exit.
    end

    # Write to debug_stderr file/IO if provided, also isolated.
    begin
      if @options.debug_stderr
        if @options.debug_stderr.respond_to?(:puts)
          @options.debug_stderr.puts(line_str)
        elsif @options.debug_stderr.is_a?(String)
          File.open(@options.debug_stderr, 'a') { |f| f.puts(line_str) }
        end
      end
    rescue StandardError
      # Drop debug_stderr write errors so they never interrupt the loop.
    end
  end
rescue StandardError
  # Stream-level error (pipe closed mid-read); the loop naturally ends here.
end

#inject_otel_trace_context(process_env, custom_env) ⇒ Object

Inject W3C trace context (TRACEPARENT/TRACESTATE, plus BAGGAGE) into the subprocess env when an OTel span is active. Guard via defined? + respond_to?, not require: an active span implies the constant is loaded, and requiring here would break against the test mock / optional gem group. Gate on the carrier’s traceparent key (the W3C propagator writes it only for a valid span context) so a baggage-only carrier or a noop propagator preserves inherited env.



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 155

def inject_otel_trace_context(process_env, custom_env)
  return unless defined?(OpenTelemetry) && OpenTelemetry.respond_to?(:propagation)

  carrier = {}
  OpenTelemetry.propagation.inject(carrier)
  return unless carrier.key?('traceparent')

  # Active span: scrub stale inherited W3C context (CI/k8s ambient env)
  # before writing fresh values, so an inherited TRACESTATE is never
  # paired with a new TRACEPARENT. nil actively unsets (spawn overlay
  # semantics — see the CLAUDECODE note in #connect; Python pops from a
  # complete env dict instead). Explicit options.env keys always win.
  %w[TRACEPARENT TRACESTATE].each do |key|
    process_env[key] = nil unless custom_env.key?(key)
  end
  carrier.each do |key, value|
    env_key = key.upcase
    process_env[env_key] = value unless custom_env.key?(env_key)
  end
rescue StandardError, ScriptError
  # Best-effort tracing must never break connect() (Python: except
  # Exception). ScriptError too: NotImplementedError < ScriptError.
end

#read_messages(&block) ⇒ Object

Raises:



479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 479

def read_messages(&block)
  return enum_for(:read_messages) unless block_given?

  raise CLIConnectionError, 'Not connected' unless @process && @stdout

  json_buffer = ''

  begin
    # The limit bounds per-read allocation: a line longer than
    # max_buffer_size+1 arrives as bounded chunks that the existing
    # accumulation + cap machinery below handles (mirrors Python, where
    # TextReceiveStream yields <=64KB chunks and the cap fires
    # incrementally). +1 so an exactly-max line plus "\n" arrives whole.
    # With UTF-8 external encoding Ruby extends a few bytes past the
    # limit rather than splitting a multibyte char. Without the limit,
    # an oversized line was fully allocated BEFORE the 1MB cap could
    # fire — unbounded memory on hostile/buggy stdout.
    @stdout.each_line("\n", @max_buffer_size + 1) do |line|
      # Position-aware whitespace handling: a chunk of an over-limit line
      # must keep its interior whitespace — a blanket per-chunk strip
      # deleted spaces inside JSON strings straddling the chunk boundary
      # and could let a just-over-cap line PARSE with bytes silently
      # missing instead of raising. Only safe edges are trimmed: full
      # single-chunk lines strip both ends (the common path, original
      # behavior); a truncated line-initial chunk keeps its tail; a
      # continuation chunk keeps its head and only drops the newline.
      ends_line = line.end_with?("\n")
      if json_buffer.empty?
        chunk = ends_line ? line.strip : line.lstrip
        next if chunk.empty?

        # When no partial JSON is buffered, the line must start with `{`
        # to be a valid stream-json message. Stray stderr-like text
        # (e.g., debug warnings the CLI occasionally writes to stdout)
        # would otherwise be appended into json_buffer, poisoning every
        # subsequent parse until the buffer overflows. Matches the Python
        # SDK's `if not json_buffer and not json_line.startswith("{")`.
        next unless chunk.start_with?('{')
      else
        chunk = ends_line ? line.chomp : line
      end

      json_buffer += chunk

      if json_buffer.bytesize > @max_buffer_size
        buffer_length = json_buffer.bytesize
        json_buffer = ''
        raise CLIJSONDecodeError.new(
          "JSON message exceeded maximum buffer size",
          StandardError.new("Buffer size #{buffer_length} exceeds limit #{@max_buffer_size}")
        )
      end

      begin
        data = JSON.parse(json_buffer, symbolize_names: true)
        json_buffer = ''
        yield data
      rescue JSON::ParserError
        # Continue accumulating (multi-line JSON, or a truncated chunk
        # awaiting the rest of its line)
        next
      end
    end
  rescue IOError
    # Stream closed
  rescue StopIteration
    # Client disconnected
  end

  # Check process completion. @process may already be nil (close() ran
  # concurrently and reset it) or already waited on (Errno::ECHILD on
  # double-wait). Both are non-fatal — the message loop just exits.
  returncode = nil
  begin
    status = @process&.value
    returncode = status&.exitstatus
  rescue Errno::ECHILD
    # Process was already reaped (e.g., by close()); no exit status to surface.
    returncode = nil
  end

  # The child has exited and been reaped; drop it from the parent-exit
  # registry now rather than waiting for #close, which a caller may never
  # reach (e.g. a Client abandoned without #disconnect, or direct transport
  # use). Idempotent — #close's own deregister becomes a harmless no-op, and
  # #close still sees @process (left set here) for its termination logic.
  self.class.deregister_active_process(@process)

  if returncode && returncode != 0
    # Wait briefly for stderr thread to finish draining
    @stderr_task&.join(1)

    stderr_text = @recent_stderr_mutex.synchronize { @recent_stderr.last(10).join("\n") }
    stderr_text = 'No stderr output captured' if stderr_text.empty?

    @exit_error = ProcessError.new(
      "Command failed with exit code #{returncode}",
      exit_code: returncode,
      stderr: stderr_text
    )
    raise @exit_error
  end
end

#ready?Boolean

Returns:

  • (Boolean)


615
616
617
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 615

def ready?
  @ready
end

#wait_process_with_timeout(timeout_seconds) ⇒ Object

Wait for the spawned process to exit, up to timeout_seconds. Polls @process.alive? rather than using stdlib Timeout.timeout, which raises across threads via Thread#raise and corrupts Async fiber-scheduler state (close is always called inside an Async task). Yields to the current Async task when one is active so the reactor keeps running.



420
421
422
423
424
425
426
427
428
429
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 420

def wait_process_with_timeout(timeout_seconds)
  deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout_seconds
  task = defined?(Async::Task) ? Async::Task.current? : nil
  while @process.alive?
    raise Timeout::Error if Process.clock_gettime(Process::CLOCK_MONOTONIC) >= deadline

    task ? task.sleep(0.05) : sleep(0.05)
  end
  @process.value
end

#write(data) ⇒ Object

Raises:



431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
# File 'lib/claude_agent_sdk/subprocess_cli_transport.rb', line 431

def write(data)
  raise CLIConnectionError, "Cannot write to terminated process" if @process && !@process.alive?
  raise CLIConnectionError, "Cannot write to process that exited with error: #{@exit_error}" if @exit_error

  # Snapshot @stdin under the lock so close() nilling it concurrently is
  # safe, but do the actual blocking IO *outside* the lock. Holding the
  # mutex across @stdin.write would let a full pipe buffer block the
  # writer indefinitely and block close() (which also needs the lock)
  # from killing the subprocess — a hang on disconnect.
  #
  # If close() runs while we are inside the IO call, it will close the
  # underlying stream and Ruby raises IOError("stream closed in another
  # thread") inside @stdin.write — the rescue below converts that into a
  # standard CLIConnectionError so callers see a clean shutdown error.
  stdin = @stdin_mutex.synchronize do
    raise CLIConnectionError, 'ProcessTransport is not ready for writing' unless @ready && @stdin

    @stdin
  end

  begin
    stdin.write(data)
    stdin.flush
  rescue StandardError => e
    @ready = false
    @exit_error = CLIConnectionError.new("Failed to write to process stdin: #{e}")
    raise @exit_error
  end
end