Class: ClaudeAgentSDK::Query
- Inherits:
-
Object
- Object
- ClaudeAgentSDK::Query
- Defined in:
- lib/claude_agent_sdk/query.rb
Overview
Handles bidirectional control protocol on top of Transport
This class manages:
-
Control request/response routing
-
Hook callbacks
-
Tool permission callbacks
-
Message streaming
-
Initialization handshake
Constant Summary collapse
- CONTROL_REQUEST_TIMEOUT_ENV_VAR =
'CLAUDE_AGENT_SDK_CONTROL_REQUEST_TIMEOUT_SECONDS'- DEFAULT_CONTROL_REQUEST_TIMEOUT_SECONDS =
1200.0- STREAM_CLOSE_TIMEOUT_ENV_VAR =
'CLAUDE_CODE_STREAM_CLOSE_TIMEOUT'- DEFAULT_STREAM_CLOSE_TIMEOUT_SECONDS =
60.0
Instance Attribute Summary collapse
-
#is_streaming_mode ⇒ Object
readonly
Returns the value of attribute is_streaming_mode.
-
#sdk_mcp_servers ⇒ Object
readonly
Returns the value of attribute sdk_mcp_servers.
-
#transport ⇒ Object
readonly
Returns the value of attribute transport.
Instance Method Summary collapse
-
#close ⇒ Object
Close the query and transport.
-
#get_context_usage ⇒ Hash
Get a breakdown of current context window usage by category.
-
#get_mcp_status ⇒ Hash
Get current MCP server connection status (only works with streaming mode).
-
#initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil) ⇒ Query
constructor
A new instance of Query.
-
#initialize_protocol ⇒ Hash?
Initialize control protocol if in streaming mode.
-
#interrupt ⇒ Object
Send interrupt control request.
-
#receive_messages(&block) ⇒ Object
Receive SDK messages (not control messages).
-
#reconnect_mcp_server(server_name) ⇒ Object
Reconnect a failed MCP server.
-
#rewind_files(user_message_uuid) ⇒ Object
Rewind files to a previous checkpoint (v0.1.15+) Restores file state to what it was at the given user message Requires enable_file_checkpointing to be true in options.
-
#set_model(model) ⇒ Object
Change the AI model.
-
#set_permission_mode(mode) ⇒ Object
Change permission mode.
-
#start ⇒ Object
Start reading messages from transport.
-
#stop_task(task_id) ⇒ Object
Stop a running background task.
-
#stream_input(stream) ⇒ Object
Stream input messages to transport.
-
#toggle_mcp_server(server_name, enabled) ⇒ Object
Enable or disable an MCP server.
-
#wait_for_result_and_end_input ⇒ Object
Wait for the first result before closing stdin when hooks or SDK MCP servers may still need to exchange control messages with the CLI.
Constructor Details
#initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil) ⇒ Query
Returns a new instance of Query.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/claude_agent_sdk/query.rb', line 28 def initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil) @transport = transport @is_streaming_mode = is_streaming_mode @can_use_tool = can_use_tool @hooks = hooks || {} @sdk_mcp_servers = sdk_mcp_servers || {} @agents = agents @exclude_dynamic_sections = exclude_dynamic_sections # Control protocol state @pending_control_responses = {} @pending_control_results = {} @hook_callbacks = {} @hook_callback_timeouts = {} @next_callback_id = 0 @request_counter = 0 @inflight_control_request_tasks = {} # Message stream @message_queue = Async::Queue.new @first_result_received = false @first_result_condition = Async::Condition.new @task = nil @initialized = false @closed = false @initialization_result = nil end |
Instance Attribute Details
#is_streaming_mode ⇒ Object (readonly)
Returns the value of attribute is_streaming_mode.
21 22 23 |
# File 'lib/claude_agent_sdk/query.rb', line 21 def is_streaming_mode @is_streaming_mode end |
#sdk_mcp_servers ⇒ Object (readonly)
Returns the value of attribute sdk_mcp_servers.
21 22 23 |
# File 'lib/claude_agent_sdk/query.rb', line 21 def sdk_mcp_servers @sdk_mcp_servers end |
#transport ⇒ Object (readonly)
Returns the value of attribute transport.
21 22 23 |
# File 'lib/claude_agent_sdk/query.rb', line 21 def transport @transport end |
Instance Method Details
#close ⇒ Object
Close the query and transport
943 944 945 946 947 |
# File 'lib/claude_agent_sdk/query.rb', line 943 def close @closed = true @task&.stop @transport.close end |
#get_context_usage ⇒ Hash
Get a breakdown of current context window usage by category.
824 825 826 |
# File 'lib/claude_agent_sdk/query.rb', line 824 def get_context_usage send_control_request({ subtype: 'get_context_usage' }) end |
#get_mcp_status ⇒ Hash
Get current MCP server connection status (only works with streaming mode)
830 831 832 |
# File 'lib/claude_agent_sdk/query.rb', line 830 def get_mcp_status send_control_request({ subtype: 'mcp_status' }) end |
#initialize_protocol ⇒ Hash?
Initialize control protocol if in streaming mode
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/claude_agent_sdk/query.rb', line 59 def initialize_protocol return nil unless @is_streaming_mode # Build hooks configuration for initialization hooks_config = {} if @hooks && !@hooks.empty? @hooks.each do |event, matchers| next if matchers.nil? || matchers.empty? hooks_config[event] = [] matchers.each do |matcher| callback_ids = [] (matcher[:hooks] || []).each do |callback| callback_id = "hook_#{@next_callback_id}" @next_callback_id += 1 @hook_callbacks[callback_id] = callback @hook_callback_timeouts[callback_id] = matcher[:timeout] if matcher[:timeout] callback_ids << callback_id end hooks_config[event] << { matcher: matcher[:matcher], hookCallbackIds: callback_ids } end end end # Build agents dict for initialization agents_dict = nil if @agents agents_dict = @agents.transform_values do |agent_def| { description: agent_def.description, prompt: agent_def.prompt, tools: agent_def.tools, disallowedTools: agent_def.disallowed_tools, model: agent_def.model, skills: agent_def.skills, memory: agent_def.memory, mcpServers: agent_def.mcp_servers, initialPrompt: agent_def.initial_prompt, maxTurns: agent_def.max_turns, background: agent_def.background, effort: agent_def.effort, permissionMode: agent_def. }.compact end end # Send initialize request request = { subtype: 'initialize', hooks: hooks_config.empty? ? nil : hooks_config, agents: agents_dict } request[:excludeDynamicSections] = @exclude_dynamic_sections unless @exclude_dynamic_sections.nil? response = send_control_request(request) @initialized = true @initialization_result = response response end |
#interrupt ⇒ Object
Send interrupt control request
835 836 837 |
# File 'lib/claude_agent_sdk/query.rb', line 835 def interrupt send_control_request({ subtype: 'interrupt' }) end |
#receive_messages(&block) ⇒ Object
Receive SDK messages (not control messages)
930 931 932 933 934 935 936 937 938 939 940 |
# File 'lib/claude_agent_sdk/query.rb', line 930 def (&block) return enum_for(:receive_messages) unless block loop do = @message_queue.dequeue break if [:type] == 'end' raise [:error] if [:type] == 'error' block.call() end end |
#reconnect_mcp_server(server_name) ⇒ Object
Reconnect a failed MCP server
857 858 859 860 861 862 |
# File 'lib/claude_agent_sdk/query.rb', line 857 def reconnect_mcp_server(server_name) send_control_request({ subtype: 'mcp_reconnect', serverName: server_name }) end |
#rewind_files(user_message_uuid) ⇒ Object
Rewind files to a previous checkpoint (v0.1.15+) Restores file state to what it was at the given user message Requires enable_file_checkpointing to be true in options
888 889 890 891 892 893 |
# File 'lib/claude_agent_sdk/query.rb', line 888 def rewind_files() send_control_request({ subtype: 'rewind_files', user_message_id: }) end |
#set_model(model) ⇒ Object
Change the AI model
848 849 850 851 852 853 |
# File 'lib/claude_agent_sdk/query.rb', line 848 def set_model(model) send_control_request({ subtype: 'set_model', model: model }) end |
#set_permission_mode(mode) ⇒ Object
Change permission mode
840 841 842 843 844 845 |
# File 'lib/claude_agent_sdk/query.rb', line 840 def (mode) send_control_request({ subtype: 'set_permission_mode', mode: mode }) end |
#start ⇒ Object
Start reading messages from transport
123 124 125 126 127 128 129 |
# File 'lib/claude_agent_sdk/query.rb', line 123 def start return if @task @task = Async do |task| task.async { } end end |
#stop_task(task_id) ⇒ Object
Stop a running background task
877 878 879 880 881 882 |
# File 'lib/claude_agent_sdk/query.rb', line 877 def stop_task(task_id) send_control_request({ subtype: 'stop_task', task_id: task_id }) end |
#stream_input(stream) ⇒ Object
Stream input messages to transport
911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 |
# File 'lib/claude_agent_sdk/query.rb', line 911 def stream_input(stream) stream.each do || break if @closed serialized = if .is_a?(Hash) JSON.generate() + "\n" else .to_s end serialized += "\n" unless serialized.end_with?("\n") @transport.write(serialized) end rescue StandardError => e # Log error but don't raise warn "Error streaming input: #{e.}" ensure wait_for_result_and_end_input end |
#toggle_mcp_server(server_name, enabled) ⇒ Object
Enable or disable an MCP server
867 868 869 870 871 872 873 |
# File 'lib/claude_agent_sdk/query.rb', line 867 def toggle_mcp_server(server_name, enabled) send_control_request({ subtype: 'mcp_toggle', serverName: server_name, enabled: enabled }) end |
#wait_for_result_and_end_input ⇒ Object
Wait for the first result before closing stdin when hooks or SDK MCP servers may still need to exchange control messages with the CLI.
897 898 899 900 901 902 903 904 905 906 907 908 |
# File 'lib/claude_agent_sdk/query.rb', line 897 def wait_for_result_and_end_input if !@first_result_received && ((@sdk_mcp_servers && !@sdk_mcp_servers.empty?) || (@hooks && !@hooks.empty?)) Async::Task.current.with_timeout(stream_close_timeout_seconds) do @first_result_condition.wait unless @first_result_received end end rescue Async::TimeoutError nil ensure @transport.end_input end |