Class: ClaudeAgentSDK::Query

Inherits:
Object
  • Object
show all
Defined in:
lib/claude_agent_sdk/query.rb

Overview

Handles bidirectional control protocol on top of Transport

This class manages:

  • Control request/response routing

  • Hook callbacks

  • Tool permission callbacks

  • Message streaming

  • Initialization handshake

Defined Under Namespace

Classes: ThreadWaiter

Constant Summary collapse

CONTROL_REQUEST_TIMEOUT_ENV_VAR =
'CLAUDE_AGENT_SDK_CONTROL_REQUEST_TIMEOUT_SECONDS'
DEFAULT_CONTROL_REQUEST_TIMEOUT_SECONDS =
1200.0

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil, skills: nil) ⇒ Query

Returns a new instance of Query.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/claude_agent_sdk/query.rb', line 48

def initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil,
               exclude_dynamic_sections: nil, skills: nil)
  @transport = transport
  @is_streaming_mode = is_streaming_mode
  @can_use_tool = can_use_tool
  @hooks = hooks || {}
  @sdk_mcp_servers = sdk_mcp_servers || {}
  @agents = agents
  @exclude_dynamic_sections = exclude_dynamic_sections
  @skills = skills

  # Control protocol state
  @pending_control_responses = {}
  @pending_control_results = {}
  @hook_callbacks = {}
  @hook_callback_timeouts = {}
  @next_callback_id = 0
  @request_counter = 0
  @request_counter_mutex = Mutex.new
  @inflight_control_request_tasks = {}

  # Message stream
  @message_queue = Async::Queue.new
  @first_result_received = false
  @last_error_result_text = nil
  @first_result_condition = Async::Condition.new
  @task = nil
  @child_tasks = []
  @initialized = false
  @closed = false
  @initialization_result = nil
  @transcript_mirror_batcher = nil
end

Instance Attribute Details

#is_streaming_modeObject (readonly)

Returns the value of attribute is_streaming_mode.



21
22
23
# File 'lib/claude_agent_sdk/query.rb', line 21

def is_streaming_mode
  @is_streaming_mode
end

#sdk_mcp_serversObject (readonly)

Returns the value of attribute sdk_mcp_servers.



21
22
23
# File 'lib/claude_agent_sdk/query.rb', line 21

def sdk_mcp_servers
  @sdk_mcp_servers
end

#transportObject (readonly)

Returns the value of attribute transport.



21
22
23
# File 'lib/claude_agent_sdk/query.rb', line 21

def transport
  @transport
end

Instance Method Details

#closeObject

Close the query and transport



1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
# File 'lib/claude_agent_sdk/query.rb', line 1148

def close
  @closed = true
  # Wake pending control-request waiters (same shape as the read-loop
  # rescue broadcast): close stops the read task with Async::Stop, which
  # bypasses that broadcast — a worker-thread caller parked in
  # ThreadWaiter#wait would otherwise leak its OS thread for the full
  # control-request timeout (up to 1200s) in long-lived processes.
  # INVARIANT: store the result before signaling (level-trigger).
  @pending_control_responses.dup.each do |request_id, waiter|
    @pending_control_results[request_id] ||= CLIConnectionError.new('Query closed')
    waiter.signal
  end
  # Final mirror flush BEFORE stopping the read task, so the last turn's
  # entries reach the store. #close on the batcher never raises.
  @transcript_mirror_batcher&.close
  # Stop tracked child tasks (e.g. stream_input) before the read task and
  # transport so a parked input stream can never keep the reactor alive
  # (mirrors Python close() cancelling _child_tasks).
  @child_tasks.each(&:stop)
  @child_tasks.clear
  @task&.stop
  @transport.close
end

#get_context_usageHash

Get a breakdown of current context window usage by category.

Returns:

  • (Hash)

    Context usage response with categories, totalTokens, maxTokens, etc.



997
998
999
# File 'lib/claude_agent_sdk/query.rb', line 997

def get_context_usage
  send_control_request({ subtype: 'get_context_usage' })
end

#get_mcp_statusHash

Get current MCP server connection status (only works with streaming mode)

Returns:

  • (Hash)

    MCP status information, including mcpServers list



1003
1004
1005
# File 'lib/claude_agent_sdk/query.rb', line 1003

def get_mcp_status
  send_control_request({ subtype: 'mcp_status' })
end

#initialize_protocolHash?

Initialize control protocol if in streaming mode

Returns:

  • (Hash, nil)

    Initialize response with supported commands, or nil if not streaming



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/claude_agent_sdk/query.rb', line 84

def initialize_protocol
  return nil unless @is_streaming_mode

  # Build hooks configuration for initialization
  hooks_config = {}
  if @hooks && !@hooks.empty?
    @hooks.each do |event, matchers|
      next if matchers.nil? || matchers.empty?

      hooks_config[event] = []
      matchers.each do |matcher|
        callback_ids = []
        (matcher[:hooks] || []).each do |callback|
          callback_id = "hook_#{@next_callback_id}"
          @next_callback_id += 1
          @hook_callbacks[callback_id] = callback
          @hook_callback_timeouts[callback_id] = matcher[:timeout] if matcher[:timeout]
          callback_ids << callback_id
        end
        matcher_config = {
          matcher: matcher[:matcher],
          hookCallbackIds: callback_ids
        }
        # Wire field is literal "timeout" in SECONDS, per matcher,
        # omitted when absent (Python _internal/query.py parity — no
        # camelCase, no ms conversion). Local enforcement via
        # @hook_callback_timeouts stays as defense-in-depth for CLIs
        # that ignore the field.
        matcher_config[:timeout] = matcher[:timeout] if matcher[:timeout]
        hooks_config[event] << matcher_config
      end
    end
  end

  # Build agents dict for initialization
  agents_dict = nil
  if @agents
    agents_dict = @agents.transform_values do |agent_def|
      {
        description: agent_def.description,
        prompt: agent_def.prompt,
        tools: agent_def.tools,
        disallowedTools: agent_def.disallowed_tools,
        model: agent_def.model,
        skills: agent_def.skills,
        memory: agent_def.memory,
        mcpServers: agent_def.mcp_servers,
        initialPrompt: agent_def.initial_prompt,
        maxTurns: agent_def.max_turns,
        background: agent_def.background,
        effort: agent_def.effort,
        permissionMode: agent_def.permission_mode
      }.compact
    end
  end

  # Send initialize request
  request = {
    subtype: 'initialize',
    hooks: hooks_config.empty? ? nil : hooks_config,
    agents: agents_dict
  }
  request[:excludeDynamicSections] = @exclude_dynamic_sections unless @exclude_dynamic_sections.nil?
  # 'all' and omitted are equivalent at the wire level (no filter), so
  # only send the field when it's an explicit list (mirrors Python).
  request[:skills] = @skills if @skills.is_a?(Array)

  response = send_control_request(request)
  @initialized = true
  @initialization_result = response
  response
end

#interruptObject

Send interrupt control request



1008
1009
1010
# File 'lib/claude_agent_sdk/query.rb', line 1008

def interrupt
  send_control_request({ subtype: 'interrupt' })
end

#receive_messages(&block) ⇒ Object

Receive SDK messages (not control messages)



1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
# File 'lib/claude_agent_sdk/query.rb', line 1135

def receive_messages(&block)
  return enum_for(:receive_messages) unless block

  loop do
    message = @message_queue.dequeue
    break if message[:type] == 'end'
    raise message[:error] if message[:type] == 'error'

    block.call(message)
  end
end

#reconnect_mcp_server(server_name) ⇒ Object

Reconnect a failed MCP server

Parameters:

  • server_name (String)

    Name of the MCP server to reconnect



1030
1031
1032
1033
1034
1035
# File 'lib/claude_agent_sdk/query.rb', line 1030

def reconnect_mcp_server(server_name)
  send_control_request({
                         subtype: 'mcp_reconnect',
                         serverName: server_name
                       })
end

#report_mirror_error(key, error) ⇒ Object

Synthesize a ‘mirror_error` system message and put it on the SDK message stream so consumers learn a mirror batch was dropped after exhausting retries. Non-blocking: the message queue is unbounded, so unlike the Python SDK there is no buffer-full drop path.



213
214
215
216
217
218
219
220
221
222
223
# File 'lib/claude_agent_sdk/query.rb', line 213

def report_mirror_error(key, error)
  session_id = key && (key['session_id'] || key[:session_id])
  @message_queue.enqueue(
    type: 'system',
    subtype: 'mirror_error',
    error: error,
    key: key,
    uuid: SecureRandom.uuid,
    session_id: session_id || ''
  )
end

#rewind_files(user_message_uuid) ⇒ Object

Rewind files to a previous checkpoint (v0.1.15+) Restores file state to what it was at the given user message Requires enable_file_checkpointing to be true in options

Parameters:

  • user_message_uuid (String)

    The UUID of the UserMessage to rewind to



1061
1062
1063
1064
1065
1066
# File 'lib/claude_agent_sdk/query.rb', line 1061

def rewind_files(user_message_uuid)
  send_control_request({
                         subtype: 'rewind_files',
                         user_message_id: user_message_uuid
                       })
end

#set_model(model) ⇒ Object

Change the AI model



1021
1022
1023
1024
1025
1026
# File 'lib/claude_agent_sdk/query.rb', line 1021

def set_model(model)
  send_control_request({
                         subtype: 'set_model',
                         model: model
                       })
end

#set_permission_mode(mode) ⇒ Object

Change permission mode



1013
1014
1015
1016
1017
1018
# File 'lib/claude_agent_sdk/query.rb', line 1013

def set_permission_mode(mode)
  send_control_request({
                         subtype: 'set_permission_mode',
                         mode: mode
                       })
end

#set_transcript_mirror_batcher(batcher) ⇒ Object

Install the transcript-mirror batcher fed by ‘transcript_mirror` frames (Client mode with a session_store). nil disables mirroring.



205
206
207
# File 'lib/claude_agent_sdk/query.rb', line 205

def set_transcript_mirror_batcher(batcher)
  @transcript_mirror_batcher = batcher
end

#spawn_task(&block) ⇒ Object

Spawn a child task that is stopped by #close (mirrors the Python SDK’s Query#spawn_task / _child_tasks). Used for background input streaming so a dying read loop or #close can never strand the stream task and hang the enclosing Async reactor.

NOTE: intentionally a partial mirror — Python prunes completed tasks via add_done_callback(_child_tasks.discard); here entries live until #close. Fine for the current one-shot call sites (max two tasks per Query); do not route per-request work (control handlers, per-turn streams) through this without adding completion-based removal.

Raises:



194
195
196
197
198
199
200
201
# File 'lib/claude_agent_sdk/query.rb', line 194

def spawn_task(&block)
  parent = Async::Task.current?
  raise CLIConnectionError, 'Query#spawn_task must be called inside an Async{} block' unless parent

  task = parent.async(&block)
  @child_tasks << task
  task
end

#startObject

Start reading messages from transport.

Spawns ‘read_messages` as a direct child task of the current Async task and stores that child in `@task`. An earlier version wrapped `task.async { read_messages }` inside an outer `Async do … end` and assigned the outer task to `@task`; the outer task completed almost immediately after spawning, so `close`’s ‘@task.stop` never reached the actual `read_messages` fiber and the read loop kept running until the transport raised. Now `@task.stop` stops the read loop.

Must be called inside an Async{} block (matches ‘query()` which wraps its own internals in Async, and the documented `Client#connect` pattern). If invoked outside a reactor, raise a clear error rather than letting Async::Task.current raise an opaque “No async task available!” — earlier versions of this method appeared to work from synchronous callers but actually hung indefinitely because the outer Async{} root task waited for read_messages to finish, which never happens for a live Client.

Raises:



175
176
177
178
179
180
181
182
# File 'lib/claude_agent_sdk/query.rb', line 175

def start
  return if @task

  parent = Async::Task.current?
  raise CLIConnectionError, 'Query#start must be called inside an Async{} block (e.g. wrap Client#connect in Async{...})' unless parent

  @task = parent.async { read_messages }
end

#stop_task(task_id) ⇒ Object

Stop a running background task

Parameters:

  • task_id (String)

    The ID of the task to stop



1050
1051
1052
1053
1054
1055
# File 'lib/claude_agent_sdk/query.rb', line 1050

def stop_task(task_id)
  send_control_request({
                         subtype: 'stop_task',
                         task_id: task_id
                       })
end

#stream_input(stream) ⇒ Object

Stream input messages to transport. NOTE: iteration runs on the reactor (the deliberate FiberBoundary carve-out — see fiber_boundary.rb): scheduler-aware blocking (Thread::Queue#pop, sleep, socket IO) parks only this task; CPU-bound or scheduler-opaque work in the enumerator must be moved to a producer Thread by the user.



1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
# File 'lib/claude_agent_sdk/query.rb', line 1091

def stream_input(stream)
  wrote_message = false
  stream.each do |message|
    break if @closed
    serialized = message.is_a?(Hash) ? JSON.generate(message) : message.to_s
    writeln(serialized)
    wrote_message = true
  end
rescue StandardError => e
  # Log error but don't raise
  warn "Error streaming input: #{e.message}"
ensure
  # Three teardown shapes:
  # - #close in progress (@closed, Async::Stop unwinding): do nothing —
  #   the transport is about to be closed, and waiting on
  #   @first_result_condition inside a stopping fiber could suspend
  #   teardown. Mirrors Python, where cancellation skips this entirely.
  # - A turn is in flight (some message reached the CLI): hold stdin
  #   open until its first result so hooks/SDK MCP control replies can
  #   still be written (no timeout — the result or process exit is
  #   guaranteed to signal).
  # - No complete message ever reached the CLI (empty stream, or the
  #   stream raised before the first write): no result can ever arrive,
  #   so waiting would park query() forever beside an idle CLI. Close
  #   stdin so the CLI sees EOF and exits. Deliberate improvement over
  #   Python, which leaves stdin open and hangs on this path.
  unless @closed
    if wrote_message
      wait_for_result_and_end_input
    else
      @transport.end_input
    end
  end
end

#toggle_mcp_server(server_name, enabled) ⇒ Object

Enable or disable an MCP server

Parameters:

  • server_name (String)

    Name of the MCP server

  • enabled (Boolean)

    Whether to enable or disable



1040
1041
1042
1043
1044
1045
1046
# File 'lib/claude_agent_sdk/query.rb', line 1040

def toggle_mcp_server(server_name, enabled)
  send_control_request({
                         subtype: 'mcp_toggle',
                         serverName: server_name,
                         enabled: enabled
                       })
end

#wait_for_result_and_end_inputObject

Wait for the first result before closing stdin when hooks or SDK MCP servers may still need to exchange control messages with the CLI. The control protocol requires stdin to stay open for the entire turn (hook replies, can_use_tool replies and SDK MCP tool results are all written to stdin), so no timeout is applied — closing stdin mid-turn silently broke hooks/MCP on turns longer than the old 60s bound (mirrors Python SDK commit c3d96cb). The condition is guaranteed to be signaled: by the result branch in read_messages, or by its ensure block when the process exits early.



1077
1078
1079
1080
1081
1082
1083
1084
# File 'lib/claude_agent_sdk/query.rb', line 1077

def wait_for_result_and_end_input
  if !@first_result_received &&
     ((@sdk_mcp_servers && !@sdk_mcp_servers.empty?) || (@hooks && !@hooks.empty?))
    @first_result_condition.wait
  end
ensure
  @transport.end_input
end

#write(string) ⇒ Object



1130
1131
1132
# File 'lib/claude_agent_sdk/query.rb', line 1130

def write(string)
  @transport.write(string)
end

#writeln(string) ⇒ Object



1126
1127
1128
# File 'lib/claude_agent_sdk/query.rb', line 1126

def writeln(string)
  write string.end_with?("\n") ? string : "#{string}\n"
end