Class: ClaudeAgentSDK::Query
- Inherits:
-
Object
- Object
- ClaudeAgentSDK::Query
- Defined in:
- lib/claude_agent_sdk/query.rb
Overview
Handles bidirectional control protocol on top of Transport
This class manages:
-
Control request/response routing
-
Hook callbacks
-
Tool permission callbacks
-
Message streaming
-
Initialization handshake
Constant Summary collapse
- CONTROL_REQUEST_TIMEOUT_ENV_VAR =
'CLAUDE_AGENT_SDK_CONTROL_REQUEST_TIMEOUT_SECONDS'- DEFAULT_CONTROL_REQUEST_TIMEOUT_SECONDS =
1200.0- STREAM_CLOSE_TIMEOUT_ENV_VAR =
'CLAUDE_CODE_STREAM_CLOSE_TIMEOUT'- DEFAULT_STREAM_CLOSE_TIMEOUT_SECONDS =
60.0
Instance Attribute Summary collapse
-
#is_streaming_mode ⇒ Object
readonly
Returns the value of attribute is_streaming_mode.
-
#sdk_mcp_servers ⇒ Object
readonly
Returns the value of attribute sdk_mcp_servers.
-
#transport ⇒ Object
readonly
Returns the value of attribute transport.
Instance Method Summary collapse
-
#close ⇒ Object
Close the query and transport.
-
#get_context_usage ⇒ Hash
Get a breakdown of current context window usage by category.
-
#get_mcp_status ⇒ Hash
Get current MCP server connection status (only works with streaming mode).
-
#initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil) ⇒ Query
constructor
A new instance of Query.
-
#initialize_protocol ⇒ Hash?
Initialize control protocol if in streaming mode.
-
#interrupt ⇒ Object
Send interrupt control request.
-
#receive_messages(&block) ⇒ Object
Receive SDK messages (not control messages).
-
#reconnect_mcp_server(server_name) ⇒ Object
Reconnect a failed MCP server.
-
#rewind_files(user_message_uuid) ⇒ Object
Rewind files to a previous checkpoint (v0.1.15+) Restores file state to what it was at the given user message Requires enable_file_checkpointing to be true in options.
-
#set_model(model) ⇒ Object
Change the AI model.
-
#set_permission_mode(mode) ⇒ Object
Change permission mode.
-
#start ⇒ Object
Start reading messages from transport.
-
#stop_task(task_id) ⇒ Object
Stop a running background task.
-
#stream_input(stream) ⇒ Object
Stream input messages to transport.
-
#toggle_mcp_server(server_name, enabled) ⇒ Object
Enable or disable an MCP server.
-
#wait_for_result_and_end_input ⇒ Object
Wait for the first result before closing stdin when hooks or SDK MCP servers may still need to exchange control messages with the CLI.
Constructor Details
#initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil) ⇒ Query
Returns a new instance of Query.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/claude_agent_sdk/query.rb', line 28 def initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil) @transport = transport @is_streaming_mode = is_streaming_mode @can_use_tool = can_use_tool @hooks = hooks || {} @sdk_mcp_servers = sdk_mcp_servers || {} @agents = agents @exclude_dynamic_sections = exclude_dynamic_sections # Control protocol state @pending_control_responses = {} @pending_control_results = {} @hook_callbacks = {} @hook_callback_timeouts = {} @next_callback_id = 0 @request_counter = 0 @inflight_control_request_tasks = {} # Message stream @message_queue = Async::Queue.new @first_result_received = false @first_result_condition = Async::Condition.new @task = nil @initialized = false @closed = false @initialization_result = nil end |
Instance Attribute Details
#is_streaming_mode ⇒ Object (readonly)
Returns the value of attribute is_streaming_mode.
21 22 23 |
# File 'lib/claude_agent_sdk/query.rb', line 21 def is_streaming_mode @is_streaming_mode end |
#sdk_mcp_servers ⇒ Object (readonly)
Returns the value of attribute sdk_mcp_servers.
21 22 23 |
# File 'lib/claude_agent_sdk/query.rb', line 21 def sdk_mcp_servers @sdk_mcp_servers end |
#transport ⇒ Object (readonly)
Returns the value of attribute transport.
21 22 23 |
# File 'lib/claude_agent_sdk/query.rb', line 21 def transport @transport end |
Instance Method Details
#close ⇒ Object
Close the query and transport
940 941 942 943 944 |
# File 'lib/claude_agent_sdk/query.rb', line 940 def close @closed = true @task&.stop @transport.close end |
#get_context_usage ⇒ Hash
Get a breakdown of current context window usage by category.
821 822 823 |
# File 'lib/claude_agent_sdk/query.rb', line 821 def get_context_usage send_control_request({ subtype: 'get_context_usage' }) end |
#get_mcp_status ⇒ Hash
Get current MCP server connection status (only works with streaming mode)
827 828 829 |
# File 'lib/claude_agent_sdk/query.rb', line 827 def get_mcp_status send_control_request({ subtype: 'mcp_status' }) end |
#initialize_protocol ⇒ Hash?
Initialize control protocol if in streaming mode
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/claude_agent_sdk/query.rb', line 59 def initialize_protocol return nil unless @is_streaming_mode # Build hooks configuration for initialization hooks_config = {} if @hooks && !@hooks.empty? @hooks.each do |event, matchers| next if matchers.nil? || matchers.empty? hooks_config[event] = [] matchers.each do |matcher| callback_ids = [] (matcher[:hooks] || []).each do |callback| callback_id = "hook_#{@next_callback_id}" @next_callback_id += 1 @hook_callbacks[callback_id] = callback @hook_callback_timeouts[callback_id] = matcher[:timeout] if matcher[:timeout] callback_ids << callback_id end hooks_config[event] << { matcher: matcher[:matcher], hookCallbackIds: callback_ids } end end end # Build agents dict for initialization agents_dict = nil if @agents agents_dict = @agents.transform_values do |agent_def| { description: agent_def.description, prompt: agent_def.prompt, tools: agent_def.tools, disallowedTools: agent_def.disallowed_tools, model: agent_def.model, skills: agent_def.skills, memory: agent_def.memory, mcpServers: agent_def.mcp_servers, initialPrompt: agent_def.initial_prompt, maxTurns: agent_def.max_turns, background: agent_def.background, effort: agent_def.effort, permissionMode: agent_def. }.compact end end # Send initialize request request = { subtype: 'initialize', hooks: hooks_config.empty? ? nil : hooks_config, agents: agents_dict } request[:excludeDynamicSections] = @exclude_dynamic_sections unless @exclude_dynamic_sections.nil? response = send_control_request(request) @initialized = true @initialization_result = response response end |
#interrupt ⇒ Object
Send interrupt control request
832 833 834 |
# File 'lib/claude_agent_sdk/query.rb', line 832 def interrupt send_control_request({ subtype: 'interrupt' }) end |
#receive_messages(&block) ⇒ Object
Receive SDK messages (not control messages)
927 928 929 930 931 932 933 934 935 936 937 |
# File 'lib/claude_agent_sdk/query.rb', line 927 def (&block) return enum_for(:receive_messages) unless block loop do = @message_queue.dequeue break if [:type] == 'end' raise [:error] if [:type] == 'error' block.call() end end |
#reconnect_mcp_server(server_name) ⇒ Object
Reconnect a failed MCP server
854 855 856 857 858 859 |
# File 'lib/claude_agent_sdk/query.rb', line 854 def reconnect_mcp_server(server_name) send_control_request({ subtype: 'mcp_reconnect', serverName: server_name }) end |
#rewind_files(user_message_uuid) ⇒ Object
Rewind files to a previous checkpoint (v0.1.15+) Restores file state to what it was at the given user message Requires enable_file_checkpointing to be true in options
885 886 887 888 889 890 |
# File 'lib/claude_agent_sdk/query.rb', line 885 def rewind_files() send_control_request({ subtype: 'rewind_files', user_message_id: }) end |
#set_model(model) ⇒ Object
Change the AI model
845 846 847 848 849 850 |
# File 'lib/claude_agent_sdk/query.rb', line 845 def set_model(model) send_control_request({ subtype: 'set_model', model: model }) end |
#set_permission_mode(mode) ⇒ Object
Change permission mode
837 838 839 840 841 842 |
# File 'lib/claude_agent_sdk/query.rb', line 837 def (mode) send_control_request({ subtype: 'set_permission_mode', mode: mode }) end |
#start ⇒ Object
Start reading messages from transport
123 124 125 126 127 128 129 |
# File 'lib/claude_agent_sdk/query.rb', line 123 def start return if @task @task = Async do |task| task.async { } end end |
#stop_task(task_id) ⇒ Object
Stop a running background task
874 875 876 877 878 879 |
# File 'lib/claude_agent_sdk/query.rb', line 874 def stop_task(task_id) send_control_request({ subtype: 'stop_task', task_id: task_id }) end |
#stream_input(stream) ⇒ Object
Stream input messages to transport
908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 |
# File 'lib/claude_agent_sdk/query.rb', line 908 def stream_input(stream) stream.each do || break if @closed serialized = if .is_a?(Hash) JSON.generate() + "\n" else .to_s end serialized += "\n" unless serialized.end_with?("\n") @transport.write(serialized) end rescue StandardError => e # Log error but don't raise warn "Error streaming input: #{e.}" ensure wait_for_result_and_end_input end |
#toggle_mcp_server(server_name, enabled) ⇒ Object
Enable or disable an MCP server
864 865 866 867 868 869 870 |
# File 'lib/claude_agent_sdk/query.rb', line 864 def toggle_mcp_server(server_name, enabled) send_control_request({ subtype: 'mcp_toggle', serverName: server_name, enabled: enabled }) end |
#wait_for_result_and_end_input ⇒ Object
Wait for the first result before closing stdin when hooks or SDK MCP servers may still need to exchange control messages with the CLI.
894 895 896 897 898 899 900 901 902 903 904 905 |
# File 'lib/claude_agent_sdk/query.rb', line 894 def wait_for_result_and_end_input if !@first_result_received && ((@sdk_mcp_servers && !@sdk_mcp_servers.empty?) || (@hooks && !@hooks.empty?)) Async::Task.current.with_timeout(stream_close_timeout_seconds) do @first_result_condition.wait unless @first_result_received end end rescue Async::TimeoutError nil ensure @transport.end_input end |