Class: ClaudeAgentSDK::Query
- Inherits:
-
Object
- Object
- ClaudeAgentSDK::Query
- Defined in:
- lib/claude_agent_sdk/query.rb
Overview
Handles bidirectional control protocol on top of Transport
This class manages:
-
Control request/response routing
-
Hook callbacks
-
Tool permission callbacks
-
Message streaming
-
Initialization handshake
Constant Summary collapse
- CONTROL_REQUEST_TIMEOUT_ENV_VAR =
'CLAUDE_AGENT_SDK_CONTROL_REQUEST_TIMEOUT_SECONDS'- DEFAULT_CONTROL_REQUEST_TIMEOUT_SECONDS =
1200.0- STREAM_CLOSE_TIMEOUT_ENV_VAR =
NOTE: CLAUDE_CODE_STREAM_CLOSE_TIMEOUT is defined by the CLI in MILLISECONDS (Python SDK uses ‘int(os.environ)/1000`); the SDK divides by 1000 to obtain seconds. The default below is seconds for direct use without env conversion (60 s = the CLI’s 60000 ms).
'CLAUDE_CODE_STREAM_CLOSE_TIMEOUT'- DEFAULT_STREAM_CLOSE_TIMEOUT_SECONDS =
60.0
Instance Attribute Summary collapse
-
#is_streaming_mode ⇒ Object
readonly
Returns the value of attribute is_streaming_mode.
-
#sdk_mcp_servers ⇒ Object
readonly
Returns the value of attribute sdk_mcp_servers.
-
#transport ⇒ Object
readonly
Returns the value of attribute transport.
Instance Method Summary collapse
-
#close ⇒ Object
Close the query and transport.
-
#get_context_usage ⇒ Hash
Get a breakdown of current context window usage by category.
-
#get_mcp_status ⇒ Hash
Get current MCP server connection status (only works with streaming mode).
-
#initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil) ⇒ Query
constructor
A new instance of Query.
-
#initialize_protocol ⇒ Hash?
Initialize control protocol if in streaming mode.
-
#interrupt ⇒ Object
Send interrupt control request.
-
#receive_messages(&block) ⇒ Object
Receive SDK messages (not control messages).
-
#reconnect_mcp_server(server_name) ⇒ Object
Reconnect a failed MCP server.
-
#rewind_files(user_message_uuid) ⇒ Object
Rewind files to a previous checkpoint (v0.1.15+) Restores file state to what it was at the given user message Requires enable_file_checkpointing to be true in options.
-
#set_model(model) ⇒ Object
Change the AI model.
-
#set_permission_mode(mode) ⇒ Object
Change permission mode.
-
#start ⇒ Object
Start reading messages from transport.
-
#stop_task(task_id) ⇒ Object
Stop a running background task.
-
#stream_input(stream) ⇒ Object
Stream input messages to transport.
-
#toggle_mcp_server(server_name, enabled) ⇒ Object
Enable or disable an MCP server.
-
#wait_for_result_and_end_input ⇒ Object
Wait for the first result before closing stdin when hooks or SDK MCP servers may still need to exchange control messages with the CLI.
- #write(string) ⇒ Object
- #writeln(string) ⇒ Object
Constructor Details
#initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil) ⇒ Query
Returns a new instance of Query.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/claude_agent_sdk/query.rb', line 32 def initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil) @transport = transport @is_streaming_mode = is_streaming_mode @can_use_tool = can_use_tool @hooks = hooks || {} @sdk_mcp_servers = sdk_mcp_servers || {} @agents = agents @exclude_dynamic_sections = exclude_dynamic_sections # Control protocol state @pending_control_responses = {} @pending_control_results = {} @hook_callbacks = {} @hook_callback_timeouts = {} @next_callback_id = 0 @request_counter = 0 @inflight_control_request_tasks = {} # Message stream @message_queue = Async::Queue.new @first_result_received = false @first_result_condition = Async::Condition.new @task = nil @initialized = false @closed = false @initialization_result = nil end |
Instance Attribute Details
#is_streaming_mode ⇒ Object (readonly)
Returns the value of attribute is_streaming_mode.
21 22 23 |
# File 'lib/claude_agent_sdk/query.rb', line 21 def is_streaming_mode @is_streaming_mode end |
#sdk_mcp_servers ⇒ Object (readonly)
Returns the value of attribute sdk_mcp_servers.
21 22 23 |
# File 'lib/claude_agent_sdk/query.rb', line 21 def sdk_mcp_servers @sdk_mcp_servers end |
#transport ⇒ Object (readonly)
Returns the value of attribute transport.
21 22 23 |
# File 'lib/claude_agent_sdk/query.rb', line 21 def transport @transport end |
Instance Method Details
#close ⇒ Object
Close the query and transport
977 978 979 980 981 |
# File 'lib/claude_agent_sdk/query.rb', line 977 def close @closed = true @task&.stop @transport.close end |
#get_context_usage ⇒ Hash
Get a breakdown of current context window usage by category.
855 856 857 |
# File 'lib/claude_agent_sdk/query.rb', line 855 def get_context_usage send_control_request({ subtype: 'get_context_usage' }) end |
#get_mcp_status ⇒ Hash
Get current MCP server connection status (only works with streaming mode)
861 862 863 |
# File 'lib/claude_agent_sdk/query.rb', line 861 def get_mcp_status send_control_request({ subtype: 'mcp_status' }) end |
#initialize_protocol ⇒ Hash?
Initialize control protocol if in streaming mode
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/claude_agent_sdk/query.rb', line 63 def initialize_protocol return nil unless @is_streaming_mode # Build hooks configuration for initialization hooks_config = {} if @hooks && !@hooks.empty? @hooks.each do |event, matchers| next if matchers.nil? || matchers.empty? hooks_config[event] = [] matchers.each do |matcher| callback_ids = [] (matcher[:hooks] || []).each do |callback| callback_id = "hook_#{@next_callback_id}" @next_callback_id += 1 @hook_callbacks[callback_id] = callback @hook_callback_timeouts[callback_id] = matcher[:timeout] if matcher[:timeout] callback_ids << callback_id end hooks_config[event] << { matcher: matcher[:matcher], hookCallbackIds: callback_ids } end end end # Build agents dict for initialization agents_dict = nil if @agents agents_dict = @agents.transform_values do |agent_def| { description: agent_def.description, prompt: agent_def.prompt, tools: agent_def.tools, disallowedTools: agent_def.disallowed_tools, model: agent_def.model, skills: agent_def.skills, memory: agent_def.memory, mcpServers: agent_def.mcp_servers, initialPrompt: agent_def.initial_prompt, maxTurns: agent_def.max_turns, background: agent_def.background, effort: agent_def.effort, permissionMode: agent_def. }.compact end end # Send initialize request request = { subtype: 'initialize', hooks: hooks_config.empty? ? nil : hooks_config, agents: agents_dict } request[:excludeDynamicSections] = @exclude_dynamic_sections unless @exclude_dynamic_sections.nil? response = send_control_request(request) @initialized = true @initialization_result = response response end |
#interrupt ⇒ Object
Send interrupt control request
866 867 868 |
# File 'lib/claude_agent_sdk/query.rb', line 866 def interrupt send_control_request({ subtype: 'interrupt' }) end |
#receive_messages(&block) ⇒ Object
Receive SDK messages (not control messages)
964 965 966 967 968 969 970 971 972 973 974 |
# File 'lib/claude_agent_sdk/query.rb', line 964 def (&block) return enum_for(:receive_messages) unless block loop do = @message_queue.dequeue break if [:type] == 'end' raise [:error] if [:type] == 'error' block.call() end end |
#reconnect_mcp_server(server_name) ⇒ Object
Reconnect a failed MCP server
888 889 890 891 892 893 |
# File 'lib/claude_agent_sdk/query.rb', line 888 def reconnect_mcp_server(server_name) send_control_request({ subtype: 'mcp_reconnect', serverName: server_name }) end |
#rewind_files(user_message_uuid) ⇒ Object
Rewind files to a previous checkpoint (v0.1.15+) Restores file state to what it was at the given user message Requires enable_file_checkpointing to be true in options
919 920 921 922 923 924 |
# File 'lib/claude_agent_sdk/query.rb', line 919 def rewind_files() send_control_request({ subtype: 'rewind_files', user_message_id: }) end |
#set_model(model) ⇒ Object
Change the AI model
879 880 881 882 883 884 |
# File 'lib/claude_agent_sdk/query.rb', line 879 def set_model(model) send_control_request({ subtype: 'set_model', model: model }) end |
#set_permission_mode(mode) ⇒ Object
Change permission mode
871 872 873 874 875 876 |
# File 'lib/claude_agent_sdk/query.rb', line 871 def (mode) send_control_request({ subtype: 'set_permission_mode', mode: mode }) end |
#start ⇒ Object
Start reading messages from transport.
Spawns ‘read_messages` as a direct child task of the current Async task and stores that child in `@task`. An earlier version wrapped `task.async { read_messages }` inside an outer `Async do … end` and assigned the outer task to `@task`; the outer task completed almost immediately after spawning, so `close`’s ‘@task.stop` never reached the actual `read_messages` fiber and the read loop kept running until the transport raised. Now `@task.stop` stops the read loop.
Must be called inside an Async{} block (matches ‘query()` which wraps its own internals in Async, and the documented `Client#connect` pattern). If invoked outside a reactor, raise a clear error rather than letting Async::Task.current raise an opaque “No async task available!” — earlier versions of this method appeared to work from synchronous callers but actually hung indefinitely because the outer Async{} root task waited for read_messages to finish, which never happens for a live Client.
144 145 146 147 148 149 150 151 |
# File 'lib/claude_agent_sdk/query.rb', line 144 def start return if @task parent = Async::Task.current? raise CLIConnectionError, 'Query#start must be called inside an Async{} block (e.g. wrap Client#connect in Async{...})' unless parent @task = parent.async { } end |
#stop_task(task_id) ⇒ Object
Stop a running background task
908 909 910 911 912 913 |
# File 'lib/claude_agent_sdk/query.rb', line 908 def stop_task(task_id) send_control_request({ subtype: 'stop_task', task_id: task_id }) end |
#stream_input(stream) ⇒ Object
Stream input messages to transport
942 943 944 945 946 947 948 949 950 951 952 953 |
# File 'lib/claude_agent_sdk/query.rb', line 942 def stream_input(stream) stream.each do || break if @closed serialized = .is_a?(Hash) ? JSON.generate() : .to_s writeln(serialized) end rescue StandardError => e # Log error but don't raise warn "Error streaming input: #{e.}" ensure wait_for_result_and_end_input end |
#toggle_mcp_server(server_name, enabled) ⇒ Object
Enable or disable an MCP server
898 899 900 901 902 903 904 |
# File 'lib/claude_agent_sdk/query.rb', line 898 def toggle_mcp_server(server_name, enabled) send_control_request({ subtype: 'mcp_toggle', serverName: server_name, enabled: enabled }) end |
#wait_for_result_and_end_input ⇒ Object
Wait for the first result before closing stdin when hooks or SDK MCP servers may still need to exchange control messages with the CLI.
928 929 930 931 932 933 934 935 936 937 938 939 |
# File 'lib/claude_agent_sdk/query.rb', line 928 def wait_for_result_and_end_input if !@first_result_received && ((@sdk_mcp_servers && !@sdk_mcp_servers.empty?) || (@hooks && !@hooks.empty?)) Async::Task.current.with_timeout(stream_close_timeout_seconds) do @first_result_condition.wait unless @first_result_received end end rescue Async::TimeoutError nil ensure @transport.end_input end |
#write(string) ⇒ Object
959 960 961 |
# File 'lib/claude_agent_sdk/query.rb', line 959 def write(string) @transport.write(string) end |
#writeln(string) ⇒ Object
955 956 957 |
# File 'lib/claude_agent_sdk/query.rb', line 955 def writeln(string) write string.end_with?("\n") ? string : "#{string}\n" end |