Class: ClaudeAgentSDK::Query

Inherits:
Object
  • Object
show all
Defined in:
lib/claude_agent_sdk/query.rb

Overview

Handles bidirectional control protocol on top of Transport

This class manages:

  • Control request/response routing

  • Hook callbacks

  • Tool permission callbacks

  • Message streaming

  • Initialization handshake

Constant Summary collapse

CONTROL_REQUEST_TIMEOUT_ENV_VAR =
'CLAUDE_AGENT_SDK_CONTROL_REQUEST_TIMEOUT_SECONDS'
DEFAULT_CONTROL_REQUEST_TIMEOUT_SECONDS =
1200.0
STREAM_CLOSE_TIMEOUT_ENV_VAR =
'CLAUDE_CODE_STREAM_CLOSE_TIMEOUT'
DEFAULT_STREAM_CLOSE_TIMEOUT_SECONDS =
60.0

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil) ⇒ Query

Returns a new instance of Query.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/claude_agent_sdk/query.rb', line 28

def initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil,
               exclude_dynamic_sections: nil)
  @transport = transport
  @is_streaming_mode = is_streaming_mode
  @can_use_tool = can_use_tool
  @hooks = hooks || {}
  @sdk_mcp_servers = sdk_mcp_servers || {}
  @agents = agents
  @exclude_dynamic_sections = exclude_dynamic_sections

  # Control protocol state
  @pending_control_responses = {}
  @pending_control_results = {}
  @hook_callbacks = {}
  @hook_callback_timeouts = {}
  @next_callback_id = 0
  @request_counter = 0
  @inflight_control_request_tasks = {}

  # Message stream
  @message_queue = Async::Queue.new
  @first_result_received = false
  @first_result_condition = Async::Condition.new
  @task = nil
  @initialized = false
  @closed = false
  @initialization_result = nil
end

Instance Attribute Details

#is_streaming_modeObject (readonly)

Returns the value of attribute is_streaming_mode.



21
22
23
# File 'lib/claude_agent_sdk/query.rb', line 21

def is_streaming_mode
  @is_streaming_mode
end

#sdk_mcp_serversObject (readonly)

Returns the value of attribute sdk_mcp_servers.



21
22
23
# File 'lib/claude_agent_sdk/query.rb', line 21

def sdk_mcp_servers
  @sdk_mcp_servers
end

#transportObject (readonly)

Returns the value of attribute transport.



21
22
23
# File 'lib/claude_agent_sdk/query.rb', line 21

def transport
  @transport
end

Instance Method Details

#closeObject

Close the query and transport



945
946
947
948
949
# File 'lib/claude_agent_sdk/query.rb', line 945

def close
  @closed = true
  @task&.stop
  @transport.close
end

#get_context_usageHash

Get a breakdown of current context window usage by category.

Returns:

  • (Hash)

    Context usage response with categories, totalTokens, maxTokens, etc.



826
827
828
# File 'lib/claude_agent_sdk/query.rb', line 826

def get_context_usage
  send_control_request({ subtype: 'get_context_usage' })
end

#get_mcp_statusHash

Get current MCP server connection status (only works with streaming mode)

Returns:

  • (Hash)

    MCP status information, including mcpServers list



832
833
834
# File 'lib/claude_agent_sdk/query.rb', line 832

def get_mcp_status
  send_control_request({ subtype: 'mcp_status' })
end

#initialize_protocolHash?

Initialize control protocol if in streaming mode

Returns:

  • (Hash, nil)

    Initialize response with supported commands, or nil if not streaming



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/claude_agent_sdk/query.rb', line 59

def initialize_protocol
  return nil unless @is_streaming_mode

  # Build hooks configuration for initialization
  hooks_config = {}
  if @hooks && !@hooks.empty?
    @hooks.each do |event, matchers|
      next if matchers.nil? || matchers.empty?

      hooks_config[event] = []
      matchers.each do |matcher|
        callback_ids = []
        (matcher[:hooks] || []).each do |callback|
          callback_id = "hook_#{@next_callback_id}"
          @next_callback_id += 1
          @hook_callbacks[callback_id] = callback
          @hook_callback_timeouts[callback_id] = matcher[:timeout] if matcher[:timeout]
          callback_ids << callback_id
        end
        hooks_config[event] << {
          matcher: matcher[:matcher],
          hookCallbackIds: callback_ids
        }
      end
    end
  end

  # Build agents dict for initialization
  agents_dict = nil
  if @agents
    agents_dict = @agents.transform_values do |agent_def|
      {
        description: agent_def.description,
        prompt: agent_def.prompt,
        tools: agent_def.tools,
        disallowedTools: agent_def.disallowed_tools,
        model: agent_def.model,
        skills: agent_def.skills,
        memory: agent_def.memory,
        mcpServers: agent_def.mcp_servers,
        initialPrompt: agent_def.initial_prompt,
        maxTurns: agent_def.max_turns,
        background: agent_def.background,
        effort: agent_def.effort,
        permissionMode: agent_def.permission_mode
      }.compact
    end
  end

  # Send initialize request
  request = {
    subtype: 'initialize',
    hooks: hooks_config.empty? ? nil : hooks_config,
    agents: agents_dict
  }
  request[:excludeDynamicSections] = @exclude_dynamic_sections unless @exclude_dynamic_sections.nil?

  response = send_control_request(request)
  @initialized = true
  @initialization_result = response
  response
end

#interruptObject

Send interrupt control request



837
838
839
# File 'lib/claude_agent_sdk/query.rb', line 837

def interrupt
  send_control_request({ subtype: 'interrupt' })
end

#receive_messages(&block) ⇒ Object

Receive SDK messages (not control messages)



932
933
934
935
936
937
938
939
940
941
942
# File 'lib/claude_agent_sdk/query.rb', line 932

def receive_messages(&block)
  return enum_for(:receive_messages) unless block

  loop do
    message = @message_queue.dequeue
    break if message[:type] == 'end'
    raise message[:error] if message[:type] == 'error'

    block.call(message)
  end
end

#reconnect_mcp_server(server_name) ⇒ Object

Reconnect a failed MCP server

Parameters:

  • server_name (String)

    Name of the MCP server to reconnect



859
860
861
862
863
864
# File 'lib/claude_agent_sdk/query.rb', line 859

def reconnect_mcp_server(server_name)
  send_control_request({
                         subtype: 'mcp_reconnect',
                         serverName: server_name
                       })
end

#rewind_files(user_message_uuid) ⇒ Object

Rewind files to a previous checkpoint (v0.1.15+) Restores file state to what it was at the given user message Requires enable_file_checkpointing to be true in options

Parameters:

  • user_message_uuid (String)

    The UUID of the UserMessage to rewind to



890
891
892
893
894
895
# File 'lib/claude_agent_sdk/query.rb', line 890

def rewind_files(user_message_uuid)
  send_control_request({
                         subtype: 'rewind_files',
                         user_message_id: user_message_uuid
                       })
end

#set_model(model) ⇒ Object

Change the AI model



850
851
852
853
854
855
# File 'lib/claude_agent_sdk/query.rb', line 850

def set_model(model)
  send_control_request({
                         subtype: 'set_model',
                         model: model
                       })
end

#set_permission_mode(mode) ⇒ Object

Change permission mode



842
843
844
845
846
847
# File 'lib/claude_agent_sdk/query.rb', line 842

def set_permission_mode(mode)
  send_control_request({
                         subtype: 'set_permission_mode',
                         mode: mode
                       })
end

#startObject

Start reading messages from transport



123
124
125
126
127
128
129
# File 'lib/claude_agent_sdk/query.rb', line 123

def start
  return if @task

  @task = Async do |task|
    task.async { read_messages }
  end
end

#stop_task(task_id) ⇒ Object

Stop a running background task

Parameters:

  • task_id (String)

    The ID of the task to stop



879
880
881
882
883
884
# File 'lib/claude_agent_sdk/query.rb', line 879

def stop_task(task_id)
  send_control_request({
                         subtype: 'stop_task',
                         task_id: task_id
                       })
end

#stream_input(stream) ⇒ Object

Stream input messages to transport



913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
# File 'lib/claude_agent_sdk/query.rb', line 913

def stream_input(stream)
  stream.each do |message|
    break if @closed
    serialized = if message.is_a?(Hash)
                   JSON.generate(message) + "\n"
                 else
                   message.to_s
                 end
    serialized += "\n" unless serialized.end_with?("\n")
    @transport.write(serialized)
  end
rescue StandardError => e
  # Log error but don't raise
  warn "Error streaming input: #{e.message}"
ensure
  wait_for_result_and_end_input
end

#toggle_mcp_server(server_name, enabled) ⇒ Object

Enable or disable an MCP server

Parameters:

  • server_name (String)

    Name of the MCP server

  • enabled (Boolean)

    Whether to enable or disable



869
870
871
872
873
874
875
# File 'lib/claude_agent_sdk/query.rb', line 869

def toggle_mcp_server(server_name, enabled)
  send_control_request({
                         subtype: 'mcp_toggle',
                         serverName: server_name,
                         enabled: enabled
                       })
end

#wait_for_result_and_end_inputObject

Wait for the first result before closing stdin when hooks or SDK MCP servers may still need to exchange control messages with the CLI.



899
900
901
902
903
904
905
906
907
908
909
910
# File 'lib/claude_agent_sdk/query.rb', line 899

def wait_for_result_and_end_input
  if !@first_result_received &&
     ((@sdk_mcp_servers && !@sdk_mcp_servers.empty?) || (@hooks && !@hooks.empty?))
    Async::Task.current.with_timeout(stream_close_timeout_seconds) do
      @first_result_condition.wait unless @first_result_received
    end
  end
rescue Async::TimeoutError
  nil
ensure
  @transport.end_input
end