Class: ClaudeAgentSDK::Query
- Inherits:
-
Object
- Object
- ClaudeAgentSDK::Query
- Defined in:
- lib/claude_agent_sdk/query.rb
Overview
Handles bidirectional control protocol on top of Transport
This class manages:
-
Control request/response routing
-
Hook callbacks
-
Tool permission callbacks
-
Message streaming
-
Initialization handshake
Constant Summary collapse
- CONTROL_REQUEST_TIMEOUT_ENV_VAR =
'CLAUDE_AGENT_SDK_CONTROL_REQUEST_TIMEOUT_SECONDS'- DEFAULT_CONTROL_REQUEST_TIMEOUT_SECONDS =
1200.0- STREAM_CLOSE_TIMEOUT_ENV_VAR =
NOTE: CLAUDE_CODE_STREAM_CLOSE_TIMEOUT is defined by the CLI in MILLISECONDS (Python SDK uses ‘int(os.environ)/1000`); the SDK divides by 1000 to obtain seconds. The default below is seconds for direct use without env conversion (60 s = the CLI’s 60000 ms).
'CLAUDE_CODE_STREAM_CLOSE_TIMEOUT'- DEFAULT_STREAM_CLOSE_TIMEOUT_SECONDS =
60.0
Instance Attribute Summary collapse
-
#is_streaming_mode ⇒ Object
readonly
Returns the value of attribute is_streaming_mode.
-
#sdk_mcp_servers ⇒ Object
readonly
Returns the value of attribute sdk_mcp_servers.
-
#transport ⇒ Object
readonly
Returns the value of attribute transport.
Instance Method Summary collapse
-
#close ⇒ Object
Close the query and transport.
-
#get_context_usage ⇒ Hash
Get a breakdown of current context window usage by category.
-
#get_mcp_status ⇒ Hash
Get current MCP server connection status (only works with streaming mode).
-
#initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil) ⇒ Query
constructor
A new instance of Query.
-
#initialize_protocol ⇒ Hash?
Initialize control protocol if in streaming mode.
-
#interrupt ⇒ Object
Send interrupt control request.
-
#receive_messages(&block) ⇒ Object
Receive SDK messages (not control messages).
-
#reconnect_mcp_server(server_name) ⇒ Object
Reconnect a failed MCP server.
-
#report_mirror_error(key, error) ⇒ Object
Synthesize a ‘mirror_error` system message and put it on the SDK message stream so consumers learn a mirror batch was dropped after exhausting retries.
-
#rewind_files(user_message_uuid) ⇒ Object
Rewind files to a previous checkpoint (v0.1.15+) Restores file state to what it was at the given user message Requires enable_file_checkpointing to be true in options.
-
#set_model(model) ⇒ Object
Change the AI model.
-
#set_permission_mode(mode) ⇒ Object
Change permission mode.
-
#set_transcript_mirror_batcher(batcher) ⇒ Object
Install the transcript-mirror batcher fed by ‘transcript_mirror` frames (Client mode with a session_store).
-
#start ⇒ Object
Start reading messages from transport.
-
#stop_task(task_id) ⇒ Object
Stop a running background task.
-
#stream_input(stream) ⇒ Object
Stream input messages to transport.
-
#toggle_mcp_server(server_name, enabled) ⇒ Object
Enable or disable an MCP server.
-
#wait_for_result_and_end_input ⇒ Object
Wait for the first result before closing stdin when hooks or SDK MCP servers may still need to exchange control messages with the CLI.
- #write(string) ⇒ Object
- #writeln(string) ⇒ Object
Constructor Details
#initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil) ⇒ Query
Returns a new instance of Query.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/claude_agent_sdk/query.rb', line 32 def initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil) @transport = transport @is_streaming_mode = is_streaming_mode @can_use_tool = can_use_tool @hooks = hooks || {} @sdk_mcp_servers = sdk_mcp_servers || {} @agents = agents @exclude_dynamic_sections = exclude_dynamic_sections # Control protocol state @pending_control_responses = {} @pending_control_results = {} @hook_callbacks = {} @hook_callback_timeouts = {} @next_callback_id = 0 @request_counter = 0 @inflight_control_request_tasks = {} # Message stream @message_queue = Async::Queue.new @first_result_received = false @first_result_condition = Async::Condition.new @task = nil @initialized = false @closed = false @initialization_result = nil @transcript_mirror_batcher = nil end |
Instance Attribute Details
#is_streaming_mode ⇒ Object (readonly)
Returns the value of attribute is_streaming_mode.
21 22 23 |
# File 'lib/claude_agent_sdk/query.rb', line 21 def is_streaming_mode @is_streaming_mode end |
#sdk_mcp_servers ⇒ Object (readonly)
Returns the value of attribute sdk_mcp_servers.
21 22 23 |
# File 'lib/claude_agent_sdk/query.rb', line 21 def sdk_mcp_servers @sdk_mcp_servers end |
#transport ⇒ Object (readonly)
Returns the value of attribute transport.
21 22 23 |
# File 'lib/claude_agent_sdk/query.rb', line 21 def transport @transport end |
Instance Method Details
#close ⇒ Object
Close the query and transport
1028 1029 1030 1031 1032 1033 1034 1035 |
# File 'lib/claude_agent_sdk/query.rb', line 1028 def close @closed = true # Final mirror flush BEFORE stopping the read task, so the last turn's # entries reach the store. #close on the batcher never raises. @transcript_mirror_batcher&.close @task&.stop @transport.close end |
#get_context_usage ⇒ Hash
Get a breakdown of current context window usage by category.
906 907 908 |
# File 'lib/claude_agent_sdk/query.rb', line 906 def get_context_usage send_control_request({ subtype: 'get_context_usage' }) end |
#get_mcp_status ⇒ Hash
Get current MCP server connection status (only works with streaming mode)
912 913 914 |
# File 'lib/claude_agent_sdk/query.rb', line 912 def get_mcp_status send_control_request({ subtype: 'mcp_status' }) end |
#initialize_protocol ⇒ Hash?
Initialize control protocol if in streaming mode
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/claude_agent_sdk/query.rb', line 64 def initialize_protocol return nil unless @is_streaming_mode # Build hooks configuration for initialization hooks_config = {} if @hooks && !@hooks.empty? @hooks.each do |event, matchers| next if matchers.nil? || matchers.empty? hooks_config[event] = [] matchers.each do |matcher| callback_ids = [] (matcher[:hooks] || []).each do |callback| callback_id = "hook_#{@next_callback_id}" @next_callback_id += 1 @hook_callbacks[callback_id] = callback @hook_callback_timeouts[callback_id] = matcher[:timeout] if matcher[:timeout] callback_ids << callback_id end hooks_config[event] << { matcher: matcher[:matcher], hookCallbackIds: callback_ids } end end end # Build agents dict for initialization agents_dict = nil if @agents agents_dict = @agents.transform_values do |agent_def| { description: agent_def.description, prompt: agent_def.prompt, tools: agent_def.tools, disallowedTools: agent_def.disallowed_tools, model: agent_def.model, skills: agent_def.skills, memory: agent_def.memory, mcpServers: agent_def.mcp_servers, initialPrompt: agent_def.initial_prompt, maxTurns: agent_def.max_turns, background: agent_def.background, effort: agent_def.effort, permissionMode: agent_def. }.compact end end # Send initialize request request = { subtype: 'initialize', hooks: hooks_config.empty? ? nil : hooks_config, agents: agents_dict } request[:excludeDynamicSections] = @exclude_dynamic_sections unless @exclude_dynamic_sections.nil? response = send_control_request(request) @initialized = true @initialization_result = response response end |
#interrupt ⇒ Object
Send interrupt control request
917 918 919 |
# File 'lib/claude_agent_sdk/query.rb', line 917 def interrupt send_control_request({ subtype: 'interrupt' }) end |
#receive_messages(&block) ⇒ Object
Receive SDK messages (not control messages)
1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 |
# File 'lib/claude_agent_sdk/query.rb', line 1015 def (&block) return enum_for(:receive_messages) unless block loop do = @message_queue.dequeue break if [:type] == 'end' raise [:error] if [:type] == 'error' block.call() end end |
#reconnect_mcp_server(server_name) ⇒ Object
Reconnect a failed MCP server
939 940 941 942 943 944 |
# File 'lib/claude_agent_sdk/query.rb', line 939 def reconnect_mcp_server(server_name) send_control_request({ subtype: 'mcp_reconnect', serverName: server_name }) end |
#report_mirror_error(key, error) ⇒ Object
Synthesize a ‘mirror_error` system message and put it on the SDK message stream so consumers learn a mirror batch was dropped after exhausting retries. Non-blocking: the message queue is unbounded, so unlike the Python SDK there is no buffer-full drop path.
164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/claude_agent_sdk/query.rb', line 164 def report_mirror_error(key, error) session_id = key && (key['session_id'] || key[:session_id]) @message_queue.enqueue( type: 'system', subtype: 'mirror_error', error: error, key: key, uuid: SecureRandom.uuid, session_id: session_id || '' ) end |
#rewind_files(user_message_uuid) ⇒ Object
Rewind files to a previous checkpoint (v0.1.15+) Restores file state to what it was at the given user message Requires enable_file_checkpointing to be true in options
970 971 972 973 974 975 |
# File 'lib/claude_agent_sdk/query.rb', line 970 def rewind_files() send_control_request({ subtype: 'rewind_files', user_message_id: }) end |
#set_model(model) ⇒ Object
Change the AI model
930 931 932 933 934 935 |
# File 'lib/claude_agent_sdk/query.rb', line 930 def set_model(model) send_control_request({ subtype: 'set_model', model: model }) end |
#set_permission_mode(mode) ⇒ Object
Change permission mode
922 923 924 925 926 927 |
# File 'lib/claude_agent_sdk/query.rb', line 922 def (mode) send_control_request({ subtype: 'set_permission_mode', mode: mode }) end |
#set_transcript_mirror_batcher(batcher) ⇒ Object
Install the transcript-mirror batcher fed by ‘transcript_mirror` frames (Client mode with a session_store). nil disables mirroring.
156 157 158 |
# File 'lib/claude_agent_sdk/query.rb', line 156 def set_transcript_mirror_batcher(batcher) @transcript_mirror_batcher = batcher end |
#start ⇒ Object
Start reading messages from transport.
Spawns ‘read_messages` as a direct child task of the current Async task and stores that child in `@task`. An earlier version wrapped `task.async { read_messages }` inside an outer `Async do … end` and assigned the outer task to `@task`; the outer task completed almost immediately after spawning, so `close`’s ‘@task.stop` never reached the actual `read_messages` fiber and the read loop kept running until the transport raised. Now `@task.stop` stops the read loop.
Must be called inside an Async{} block (matches ‘query()` which wraps its own internals in Async, and the documented `Client#connect` pattern). If invoked outside a reactor, raise a clear error rather than letting Async::Task.current raise an opaque “No async task available!” — earlier versions of this method appeared to work from synchronous callers but actually hung indefinitely because the outer Async{} root task waited for read_messages to finish, which never happens for a live Client.
145 146 147 148 149 150 151 152 |
# File 'lib/claude_agent_sdk/query.rb', line 145 def start return if @task parent = Async::Task.current? raise CLIConnectionError, 'Query#start must be called inside an Async{} block (e.g. wrap Client#connect in Async{...})' unless parent @task = parent.async { } end |
#stop_task(task_id) ⇒ Object
Stop a running background task
959 960 961 962 963 964 |
# File 'lib/claude_agent_sdk/query.rb', line 959 def stop_task(task_id) send_control_request({ subtype: 'stop_task', task_id: task_id }) end |
#stream_input(stream) ⇒ Object
Stream input messages to transport
993 994 995 996 997 998 999 1000 1001 1002 1003 1004 |
# File 'lib/claude_agent_sdk/query.rb', line 993 def stream_input(stream) stream.each do || break if @closed serialized = .is_a?(Hash) ? JSON.generate() : .to_s writeln(serialized) end rescue StandardError => e # Log error but don't raise warn "Error streaming input: #{e.}" ensure wait_for_result_and_end_input end |
#toggle_mcp_server(server_name, enabled) ⇒ Object
Enable or disable an MCP server
949 950 951 952 953 954 955 |
# File 'lib/claude_agent_sdk/query.rb', line 949 def toggle_mcp_server(server_name, enabled) send_control_request({ subtype: 'mcp_toggle', serverName: server_name, enabled: enabled }) end |
#wait_for_result_and_end_input ⇒ Object
Wait for the first result before closing stdin when hooks or SDK MCP servers may still need to exchange control messages with the CLI.
979 980 981 982 983 984 985 986 987 988 989 990 |
# File 'lib/claude_agent_sdk/query.rb', line 979 def wait_for_result_and_end_input if !@first_result_received && ((@sdk_mcp_servers && !@sdk_mcp_servers.empty?) || (@hooks && !@hooks.empty?)) Async::Task.current.with_timeout(stream_close_timeout_seconds) do @first_result_condition.wait unless @first_result_received end end rescue Async::TimeoutError nil ensure @transport.end_input end |
#write(string) ⇒ Object
1010 1011 1012 |
# File 'lib/claude_agent_sdk/query.rb', line 1010 def write(string) @transport.write(string) end |
#writeln(string) ⇒ Object
1006 1007 1008 |
# File 'lib/claude_agent_sdk/query.rb', line 1006 def writeln(string) write string.end_with?("\n") ? string : "#{string}\n" end |