Class: ClaudeAgentSDK::Query
- Inherits:
-
Object
- Object
- ClaudeAgentSDK::Query
- Defined in:
- lib/claude_agent_sdk/query.rb
Overview
Handles bidirectional control protocol on top of Transport
This class manages:
-
Control request/response routing
-
Hook callbacks
-
Tool permission callbacks
-
Message streaming
-
Initialization handshake
Defined Under Namespace
Classes: ThreadWaiter
Constant Summary collapse
- CONTROL_REQUEST_TIMEOUT_ENV_VAR =
'CLAUDE_AGENT_SDK_CONTROL_REQUEST_TIMEOUT_SECONDS'- DEFAULT_CONTROL_REQUEST_TIMEOUT_SECONDS =
1200.0
Instance Attribute Summary collapse
-
#is_streaming_mode ⇒ Object
readonly
Returns the value of attribute is_streaming_mode.
-
#sdk_mcp_servers ⇒ Object
readonly
Returns the value of attribute sdk_mcp_servers.
-
#transport ⇒ Object
readonly
Returns the value of attribute transport.
Instance Method Summary collapse
-
#close ⇒ Object
Close the query and transport.
-
#get_context_usage ⇒ Hash
Get a breakdown of current context window usage by category.
-
#get_mcp_status ⇒ Hash
Get current MCP server connection status (only works with streaming mode).
-
#initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil, skills: nil) ⇒ Query
constructor
A new instance of Query.
-
#initialize_protocol ⇒ Hash?
Initialize control protocol if in streaming mode.
-
#interrupt ⇒ Object
Send interrupt control request.
-
#receive_messages(&block) ⇒ Object
Receive SDK messages (not control messages).
-
#reconnect_mcp_server(server_name) ⇒ Object
Reconnect a failed MCP server.
-
#report_mirror_error(key, error) ⇒ Object
Synthesize a ‘mirror_error` system message and put it on the SDK message stream so consumers learn a mirror batch was dropped after exhausting retries.
-
#rewind_files(user_message_uuid) ⇒ Object
Rewind files to a previous checkpoint (v0.1.15+) Restores file state to what it was at the given user message Requires enable_file_checkpointing to be true in options.
-
#set_model(model) ⇒ Object
Change the AI model.
-
#set_permission_mode(mode) ⇒ Object
Change permission mode.
-
#set_transcript_mirror_batcher(batcher) ⇒ Object
Install the transcript-mirror batcher fed by ‘transcript_mirror` frames (Client mode with a session_store).
-
#spawn_task(&block) ⇒ Object
Spawn a child task that is stopped by #close (mirrors the Python SDK’s Query#spawn_task / _child_tasks).
-
#start ⇒ Object
Start reading messages from transport.
-
#stop_task(task_id) ⇒ Object
Stop a running background task.
-
#stream_input(stream) ⇒ Object
Stream input messages to transport.
-
#toggle_mcp_server(server_name, enabled) ⇒ Object
Enable or disable an MCP server.
-
#wait_for_result_and_end_input ⇒ Object
Wait for the first result before closing stdin when hooks or SDK MCP servers may still need to exchange control messages with the CLI.
- #write(string) ⇒ Object
- #writeln(string) ⇒ Object
Constructor Details
#initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil, skills: nil) ⇒ Query
Returns a new instance of Query.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/claude_agent_sdk/query.rb', line 48 def initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil, skills: nil) @transport = transport @is_streaming_mode = is_streaming_mode @can_use_tool = can_use_tool @hooks = hooks || {} @sdk_mcp_servers = sdk_mcp_servers || {} @agents = agents @exclude_dynamic_sections = exclude_dynamic_sections @skills = skills # Control protocol state @pending_control_responses = {} @pending_control_results = {} @hook_callbacks = {} @hook_callback_timeouts = {} @next_callback_id = 0 @request_counter = 0 @request_counter_mutex = Mutex.new @inflight_control_request_tasks = {} # Message stream @message_queue = Async::Queue.new @first_result_received = false @last_error_result_text = nil @first_result_condition = Async::Condition.new @task = nil @child_tasks = [] @initialized = false @closed = false @initialization_result = nil @transcript_mirror_batcher = nil end |
Instance Attribute Details
#is_streaming_mode ⇒ Object (readonly)
Returns the value of attribute is_streaming_mode.
21 22 23 |
# File 'lib/claude_agent_sdk/query.rb', line 21 def is_streaming_mode @is_streaming_mode end |
#sdk_mcp_servers ⇒ Object (readonly)
Returns the value of attribute sdk_mcp_servers.
21 22 23 |
# File 'lib/claude_agent_sdk/query.rb', line 21 def sdk_mcp_servers @sdk_mcp_servers end |
#transport ⇒ Object (readonly)
Returns the value of attribute transport.
21 22 23 |
# File 'lib/claude_agent_sdk/query.rb', line 21 def transport @transport end |
Instance Method Details
#close ⇒ Object
Close the query and transport
1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 |
# File 'lib/claude_agent_sdk/query.rb', line 1148 def close @closed = true # Wake pending control-request waiters (same shape as the read-loop # rescue broadcast): close stops the read task with Async::Stop, which # bypasses that broadcast — a worker-thread caller parked in # ThreadWaiter#wait would otherwise leak its OS thread for the full # control-request timeout (up to 1200s) in long-lived processes. # INVARIANT: store the result before signaling (level-trigger). @pending_control_responses.dup.each do |request_id, waiter| @pending_control_results[request_id] ||= CLIConnectionError.new('Query closed') waiter.signal end # Final mirror flush BEFORE stopping the read task, so the last turn's # entries reach the store. #close on the batcher never raises. @transcript_mirror_batcher&.close # Stop tracked child tasks (e.g. stream_input) before the read task and # transport so a parked input stream can never keep the reactor alive # (mirrors Python close() cancelling _child_tasks). @child_tasks.each(&:stop) @child_tasks.clear @task&.stop @transport.close end |
#get_context_usage ⇒ Hash
Get a breakdown of current context window usage by category.
997 998 999 |
# File 'lib/claude_agent_sdk/query.rb', line 997 def get_context_usage send_control_request({ subtype: 'get_context_usage' }) end |
#get_mcp_status ⇒ Hash
Get current MCP server connection status (only works with streaming mode)
1003 1004 1005 |
# File 'lib/claude_agent_sdk/query.rb', line 1003 def get_mcp_status send_control_request({ subtype: 'mcp_status' }) end |
#initialize_protocol ⇒ Hash?
Initialize control protocol if in streaming mode
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/claude_agent_sdk/query.rb', line 84 def initialize_protocol return nil unless @is_streaming_mode # Build hooks configuration for initialization hooks_config = {} if @hooks && !@hooks.empty? @hooks.each do |event, matchers| next if matchers.nil? || matchers.empty? hooks_config[event] = [] matchers.each do |matcher| callback_ids = [] (matcher[:hooks] || []).each do |callback| callback_id = "hook_#{@next_callback_id}" @next_callback_id += 1 @hook_callbacks[callback_id] = callback @hook_callback_timeouts[callback_id] = matcher[:timeout] if matcher[:timeout] callback_ids << callback_id end matcher_config = { matcher: matcher[:matcher], hookCallbackIds: callback_ids } # Wire field is literal "timeout" in SECONDS, per matcher, # omitted when absent (Python _internal/query.py parity — no # camelCase, no ms conversion). Local enforcement via # @hook_callback_timeouts stays as defense-in-depth for CLIs # that ignore the field. matcher_config[:timeout] = matcher[:timeout] if matcher[:timeout] hooks_config[event] << matcher_config end end end # Build agents dict for initialization agents_dict = nil if @agents agents_dict = @agents.transform_values do |agent_def| { description: agent_def.description, prompt: agent_def.prompt, tools: agent_def.tools, disallowedTools: agent_def.disallowed_tools, model: agent_def.model, skills: agent_def.skills, memory: agent_def.memory, mcpServers: agent_def.mcp_servers, initialPrompt: agent_def.initial_prompt, maxTurns: agent_def.max_turns, background: agent_def.background, effort: agent_def.effort, permissionMode: agent_def. }.compact end end # Send initialize request request = { subtype: 'initialize', hooks: hooks_config.empty? ? nil : hooks_config, agents: agents_dict } request[:excludeDynamicSections] = @exclude_dynamic_sections unless @exclude_dynamic_sections.nil? # 'all' and omitted are equivalent at the wire level (no filter), so # only send the field when it's an explicit list (mirrors Python). request[:skills] = @skills if @skills.is_a?(Array) response = send_control_request(request) @initialized = true @initialization_result = response response end |
#interrupt ⇒ Object
Send interrupt control request
1008 1009 1010 |
# File 'lib/claude_agent_sdk/query.rb', line 1008 def interrupt send_control_request({ subtype: 'interrupt' }) end |
#receive_messages(&block) ⇒ Object
Receive SDK messages (not control messages)
1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 |
# File 'lib/claude_agent_sdk/query.rb', line 1135 def (&block) return enum_for(:receive_messages) unless block loop do = @message_queue.dequeue break if [:type] == 'end' raise [:error] if [:type] == 'error' block.call() end end |
#reconnect_mcp_server(server_name) ⇒ Object
Reconnect a failed MCP server
1030 1031 1032 1033 1034 1035 |
# File 'lib/claude_agent_sdk/query.rb', line 1030 def reconnect_mcp_server(server_name) send_control_request({ subtype: 'mcp_reconnect', serverName: server_name }) end |
#report_mirror_error(key, error) ⇒ Object
Synthesize a ‘mirror_error` system message and put it on the SDK message stream so consumers learn a mirror batch was dropped after exhausting retries. Non-blocking: the message queue is unbounded, so unlike the Python SDK there is no buffer-full drop path.
213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/claude_agent_sdk/query.rb', line 213 def report_mirror_error(key, error) session_id = key && (key['session_id'] || key[:session_id]) @message_queue.enqueue( type: 'system', subtype: 'mirror_error', error: error, key: key, uuid: SecureRandom.uuid, session_id: session_id || '' ) end |
#rewind_files(user_message_uuid) ⇒ Object
Rewind files to a previous checkpoint (v0.1.15+) Restores file state to what it was at the given user message Requires enable_file_checkpointing to be true in options
1061 1062 1063 1064 1065 1066 |
# File 'lib/claude_agent_sdk/query.rb', line 1061 def rewind_files() send_control_request({ subtype: 'rewind_files', user_message_id: }) end |
#set_model(model) ⇒ Object
Change the AI model
1021 1022 1023 1024 1025 1026 |
# File 'lib/claude_agent_sdk/query.rb', line 1021 def set_model(model) send_control_request({ subtype: 'set_model', model: model }) end |
#set_permission_mode(mode) ⇒ Object
Change permission mode
1013 1014 1015 1016 1017 1018 |
# File 'lib/claude_agent_sdk/query.rb', line 1013 def (mode) send_control_request({ subtype: 'set_permission_mode', mode: mode }) end |
#set_transcript_mirror_batcher(batcher) ⇒ Object
Install the transcript-mirror batcher fed by ‘transcript_mirror` frames (Client mode with a session_store). nil disables mirroring.
205 206 207 |
# File 'lib/claude_agent_sdk/query.rb', line 205 def set_transcript_mirror_batcher(batcher) @transcript_mirror_batcher = batcher end |
#spawn_task(&block) ⇒ Object
Spawn a child task that is stopped by #close (mirrors the Python SDK’s Query#spawn_task / _child_tasks). Used for background input streaming so a dying read loop or #close can never strand the stream task and hang the enclosing Async reactor.
NOTE: intentionally a partial mirror — Python prunes completed tasks via add_done_callback(_child_tasks.discard); here entries live until #close. Fine for the current one-shot call sites (max two tasks per Query); do not route per-request work (control handlers, per-turn streams) through this without adding completion-based removal.
194 195 196 197 198 199 200 201 |
# File 'lib/claude_agent_sdk/query.rb', line 194 def spawn_task(&block) parent = Async::Task.current? raise CLIConnectionError, 'Query#spawn_task must be called inside an Async{} block' unless parent task = parent.async(&block) @child_tasks << task task end |
#start ⇒ Object
Start reading messages from transport.
Spawns ‘read_messages` as a direct child task of the current Async task and stores that child in `@task`. An earlier version wrapped `task.async { read_messages }` inside an outer `Async do … end` and assigned the outer task to `@task`; the outer task completed almost immediately after spawning, so `close`’s ‘@task.stop` never reached the actual `read_messages` fiber and the read loop kept running until the transport raised. Now `@task.stop` stops the read loop.
Must be called inside an Async{} block (matches ‘query()` which wraps its own internals in Async, and the documented `Client#connect` pattern). If invoked outside a reactor, raise a clear error rather than letting Async::Task.current raise an opaque “No async task available!” — earlier versions of this method appeared to work from synchronous callers but actually hung indefinitely because the outer Async{} root task waited for read_messages to finish, which never happens for a live Client.
175 176 177 178 179 180 181 182 |
# File 'lib/claude_agent_sdk/query.rb', line 175 def start return if @task parent = Async::Task.current? raise CLIConnectionError, 'Query#start must be called inside an Async{} block (e.g. wrap Client#connect in Async{...})' unless parent @task = parent.async { } end |
#stop_task(task_id) ⇒ Object
Stop a running background task
1050 1051 1052 1053 1054 1055 |
# File 'lib/claude_agent_sdk/query.rb', line 1050 def stop_task(task_id) send_control_request({ subtype: 'stop_task', task_id: task_id }) end |
#stream_input(stream) ⇒ Object
Stream input messages to transport. NOTE: iteration runs on the reactor (the deliberate FiberBoundary carve-out — see fiber_boundary.rb): scheduler-aware blocking (Thread::Queue#pop, sleep, socket IO) parks only this task; CPU-bound or scheduler-opaque work in the enumerator must be moved to a producer Thread by the user.
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 |
# File 'lib/claude_agent_sdk/query.rb', line 1091 def stream_input(stream) = false stream.each do || break if @closed serialized = .is_a?(Hash) ? JSON.generate() : .to_s writeln(serialized) = true end rescue StandardError => e # Log error but don't raise warn "Error streaming input: #{e.}" ensure # Three teardown shapes: # - #close in progress (@closed, Async::Stop unwinding): do nothing — # the transport is about to be closed, and waiting on # @first_result_condition inside a stopping fiber could suspend # teardown. Mirrors Python, where cancellation skips this entirely. # - A turn is in flight (some message reached the CLI): hold stdin # open until its first result so hooks/SDK MCP control replies can # still be written (no timeout — the result or process exit is # guaranteed to signal). # - No complete message ever reached the CLI (empty stream, or the # stream raised before the first write): no result can ever arrive, # so waiting would park query() forever beside an idle CLI. Close # stdin so the CLI sees EOF and exits. Deliberate improvement over # Python, which leaves stdin open and hangs on this path. unless @closed if wait_for_result_and_end_input else @transport.end_input end end end |
#toggle_mcp_server(server_name, enabled) ⇒ Object
Enable or disable an MCP server
1040 1041 1042 1043 1044 1045 1046 |
# File 'lib/claude_agent_sdk/query.rb', line 1040 def toggle_mcp_server(server_name, enabled) send_control_request({ subtype: 'mcp_toggle', serverName: server_name, enabled: enabled }) end |
#wait_for_result_and_end_input ⇒ Object
Wait for the first result before closing stdin when hooks or SDK MCP servers may still need to exchange control messages with the CLI. The control protocol requires stdin to stay open for the entire turn (hook replies, can_use_tool replies and SDK MCP tool results are all written to stdin), so no timeout is applied — closing stdin mid-turn silently broke hooks/MCP on turns longer than the old 60s bound (mirrors Python SDK commit c3d96cb). The condition is guaranteed to be signaled: by the result branch in read_messages, or by its ensure block when the process exits early.
1077 1078 1079 1080 1081 1082 1083 1084 |
# File 'lib/claude_agent_sdk/query.rb', line 1077 def wait_for_result_and_end_input if !@first_result_received && ((@sdk_mcp_servers && !@sdk_mcp_servers.empty?) || (@hooks && !@hooks.empty?)) @first_result_condition.wait end ensure @transport.end_input end |
#write(string) ⇒ Object
1130 1131 1132 |
# File 'lib/claude_agent_sdk/query.rb', line 1130 def write(string) @transport.write(string) end |
#writeln(string) ⇒ Object
1126 1127 1128 |
# File 'lib/claude_agent_sdk/query.rb', line 1126 def writeln(string) write string.end_with?("\n") ? string : "#{string}\n" end |