Class: ClaudeAgentSDK::Query

Inherits:
Object
  • Object
show all
Defined in:
lib/claude_agent_sdk/query.rb

Overview

Handles bidirectional control protocol on top of Transport

This class manages:

  • Control request/response routing

  • Hook callbacks

  • Tool permission callbacks

  • Message streaming

  • Initialization handshake

Constant Summary collapse

CONTROL_REQUEST_TIMEOUT_ENV_VAR =
'CLAUDE_AGENT_SDK_CONTROL_REQUEST_TIMEOUT_SECONDS'
DEFAULT_CONTROL_REQUEST_TIMEOUT_SECONDS =
1200.0
STREAM_CLOSE_TIMEOUT_ENV_VAR =

NOTE: CLAUDE_CODE_STREAM_CLOSE_TIMEOUT is defined by the CLI in MILLISECONDS (Python SDK uses ‘int(os.environ)/1000`); the SDK divides by 1000 to obtain seconds. The default below is seconds for direct use without env conversion (60 s = the CLI’s 60000 ms).

'CLAUDE_CODE_STREAM_CLOSE_TIMEOUT'
DEFAULT_STREAM_CLOSE_TIMEOUT_SECONDS =
60.0

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil, exclude_dynamic_sections: nil) ⇒ Query

Returns a new instance of Query.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/claude_agent_sdk/query.rb', line 32

def initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil, agents: nil,
               exclude_dynamic_sections: nil)
  @transport = transport
  @is_streaming_mode = is_streaming_mode
  @can_use_tool = can_use_tool
  @hooks = hooks || {}
  @sdk_mcp_servers = sdk_mcp_servers || {}
  @agents = agents
  @exclude_dynamic_sections = exclude_dynamic_sections

  # Control protocol state
  @pending_control_responses = {}
  @pending_control_results = {}
  @hook_callbacks = {}
  @hook_callback_timeouts = {}
  @next_callback_id = 0
  @request_counter = 0
  @inflight_control_request_tasks = {}

  # Message stream
  @message_queue = Async::Queue.new
  @first_result_received = false
  @first_result_condition = Async::Condition.new
  @task = nil
  @initialized = false
  @closed = false
  @initialization_result = nil
  @transcript_mirror_batcher = nil
end

Instance Attribute Details

#is_streaming_modeObject (readonly)

Returns the value of attribute is_streaming_mode.



21
22
23
# File 'lib/claude_agent_sdk/query.rb', line 21

def is_streaming_mode
  @is_streaming_mode
end

#sdk_mcp_serversObject (readonly)

Returns the value of attribute sdk_mcp_servers.



21
22
23
# File 'lib/claude_agent_sdk/query.rb', line 21

def sdk_mcp_servers
  @sdk_mcp_servers
end

#transportObject (readonly)

Returns the value of attribute transport.



21
22
23
# File 'lib/claude_agent_sdk/query.rb', line 21

def transport
  @transport
end

Instance Method Details

#closeObject

Close the query and transport



1028
1029
1030
1031
1032
1033
1034
1035
# File 'lib/claude_agent_sdk/query.rb', line 1028

def close
  @closed = true
  # Final mirror flush BEFORE stopping the read task, so the last turn's
  # entries reach the store. #close on the batcher never raises.
  @transcript_mirror_batcher&.close
  @task&.stop
  @transport.close
end

#get_context_usageHash

Get a breakdown of current context window usage by category.

Returns:

  • (Hash)

    Context usage response with categories, totalTokens, maxTokens, etc.



906
907
908
# File 'lib/claude_agent_sdk/query.rb', line 906

def get_context_usage
  send_control_request({ subtype: 'get_context_usage' })
end

#get_mcp_statusHash

Get current MCP server connection status (only works with streaming mode)

Returns:

  • (Hash)

    MCP status information, including mcpServers list



912
913
914
# File 'lib/claude_agent_sdk/query.rb', line 912

def get_mcp_status
  send_control_request({ subtype: 'mcp_status' })
end

#initialize_protocolHash?

Initialize control protocol if in streaming mode

Returns:

  • (Hash, nil)

    Initialize response with supported commands, or nil if not streaming



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/claude_agent_sdk/query.rb', line 64

def initialize_protocol
  return nil unless @is_streaming_mode

  # Build hooks configuration for initialization
  hooks_config = {}
  if @hooks && !@hooks.empty?
    @hooks.each do |event, matchers|
      next if matchers.nil? || matchers.empty?

      hooks_config[event] = []
      matchers.each do |matcher|
        callback_ids = []
        (matcher[:hooks] || []).each do |callback|
          callback_id = "hook_#{@next_callback_id}"
          @next_callback_id += 1
          @hook_callbacks[callback_id] = callback
          @hook_callback_timeouts[callback_id] = matcher[:timeout] if matcher[:timeout]
          callback_ids << callback_id
        end
        hooks_config[event] << {
          matcher: matcher[:matcher],
          hookCallbackIds: callback_ids
        }
      end
    end
  end

  # Build agents dict for initialization
  agents_dict = nil
  if @agents
    agents_dict = @agents.transform_values do |agent_def|
      {
        description: agent_def.description,
        prompt: agent_def.prompt,
        tools: agent_def.tools,
        disallowedTools: agent_def.disallowed_tools,
        model: agent_def.model,
        skills: agent_def.skills,
        memory: agent_def.memory,
        mcpServers: agent_def.mcp_servers,
        initialPrompt: agent_def.initial_prompt,
        maxTurns: agent_def.max_turns,
        background: agent_def.background,
        effort: agent_def.effort,
        permissionMode: agent_def.permission_mode
      }.compact
    end
  end

  # Send initialize request
  request = {
    subtype: 'initialize',
    hooks: hooks_config.empty? ? nil : hooks_config,
    agents: agents_dict
  }
  request[:excludeDynamicSections] = @exclude_dynamic_sections unless @exclude_dynamic_sections.nil?

  response = send_control_request(request)
  @initialized = true
  @initialization_result = response
  response
end

#interruptObject

Send interrupt control request



917
918
919
# File 'lib/claude_agent_sdk/query.rb', line 917

def interrupt
  send_control_request({ subtype: 'interrupt' })
end

#receive_messages(&block) ⇒ Object

Receive SDK messages (not control messages)



1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
# File 'lib/claude_agent_sdk/query.rb', line 1015

def receive_messages(&block)
  return enum_for(:receive_messages) unless block

  loop do
    message = @message_queue.dequeue
    break if message[:type] == 'end'
    raise message[:error] if message[:type] == 'error'

    block.call(message)
  end
end

#reconnect_mcp_server(server_name) ⇒ Object

Reconnect a failed MCP server

Parameters:

  • server_name (String)

    Name of the MCP server to reconnect



939
940
941
942
943
944
# File 'lib/claude_agent_sdk/query.rb', line 939

def reconnect_mcp_server(server_name)
  send_control_request({
                         subtype: 'mcp_reconnect',
                         serverName: server_name
                       })
end

#report_mirror_error(key, error) ⇒ Object

Synthesize a ‘mirror_error` system message and put it on the SDK message stream so consumers learn a mirror batch was dropped after exhausting retries. Non-blocking: the message queue is unbounded, so unlike the Python SDK there is no buffer-full drop path.



164
165
166
167
168
169
170
171
172
173
174
# File 'lib/claude_agent_sdk/query.rb', line 164

def report_mirror_error(key, error)
  session_id = key && (key['session_id'] || key[:session_id])
  @message_queue.enqueue(
    type: 'system',
    subtype: 'mirror_error',
    error: error,
    key: key,
    uuid: SecureRandom.uuid,
    session_id: session_id || ''
  )
end

#rewind_files(user_message_uuid) ⇒ Object

Rewind files to a previous checkpoint (v0.1.15+) Restores file state to what it was at the given user message Requires enable_file_checkpointing to be true in options

Parameters:

  • user_message_uuid (String)

    The UUID of the UserMessage to rewind to



970
971
972
973
974
975
# File 'lib/claude_agent_sdk/query.rb', line 970

def rewind_files(user_message_uuid)
  send_control_request({
                         subtype: 'rewind_files',
                         user_message_id: user_message_uuid
                       })
end

#set_model(model) ⇒ Object

Change the AI model



930
931
932
933
934
935
# File 'lib/claude_agent_sdk/query.rb', line 930

def set_model(model)
  send_control_request({
                         subtype: 'set_model',
                         model: model
                       })
end

#set_permission_mode(mode) ⇒ Object

Change permission mode



922
923
924
925
926
927
# File 'lib/claude_agent_sdk/query.rb', line 922

def set_permission_mode(mode)
  send_control_request({
                         subtype: 'set_permission_mode',
                         mode: mode
                       })
end

#set_transcript_mirror_batcher(batcher) ⇒ Object

Install the transcript-mirror batcher fed by ‘transcript_mirror` frames (Client mode with a session_store). nil disables mirroring.



156
157
158
# File 'lib/claude_agent_sdk/query.rb', line 156

def set_transcript_mirror_batcher(batcher)
  @transcript_mirror_batcher = batcher
end

#startObject

Start reading messages from transport.

Spawns ‘read_messages` as a direct child task of the current Async task and stores that child in `@task`. An earlier version wrapped `task.async { read_messages }` inside an outer `Async do … end` and assigned the outer task to `@task`; the outer task completed almost immediately after spawning, so `close`’s ‘@task.stop` never reached the actual `read_messages` fiber and the read loop kept running until the transport raised. Now `@task.stop` stops the read loop.

Must be called inside an Async{} block (matches ‘query()` which wraps its own internals in Async, and the documented `Client#connect` pattern). If invoked outside a reactor, raise a clear error rather than letting Async::Task.current raise an opaque “No async task available!” — earlier versions of this method appeared to work from synchronous callers but actually hung indefinitely because the outer Async{} root task waited for read_messages to finish, which never happens for a live Client.

Raises:



145
146
147
148
149
150
151
152
# File 'lib/claude_agent_sdk/query.rb', line 145

def start
  return if @task

  parent = Async::Task.current?
  raise CLIConnectionError, 'Query#start must be called inside an Async{} block (e.g. wrap Client#connect in Async{...})' unless parent

  @task = parent.async { read_messages }
end

#stop_task(task_id) ⇒ Object

Stop a running background task

Parameters:

  • task_id (String)

    The ID of the task to stop



959
960
961
962
963
964
# File 'lib/claude_agent_sdk/query.rb', line 959

def stop_task(task_id)
  send_control_request({
                         subtype: 'stop_task',
                         task_id: task_id
                       })
end

#stream_input(stream) ⇒ Object

Stream input messages to transport



993
994
995
996
997
998
999
1000
1001
1002
1003
1004
# File 'lib/claude_agent_sdk/query.rb', line 993

def stream_input(stream)
  stream.each do |message|
    break if @closed
    serialized = message.is_a?(Hash) ? JSON.generate(message) : message.to_s
    writeln(serialized)
  end
rescue StandardError => e
  # Log error but don't raise
  warn "Error streaming input: #{e.message}"
ensure
  wait_for_result_and_end_input
end

#toggle_mcp_server(server_name, enabled) ⇒ Object

Enable or disable an MCP server

Parameters:

  • server_name (String)

    Name of the MCP server

  • enabled (Boolean)

    Whether to enable or disable



949
950
951
952
953
954
955
# File 'lib/claude_agent_sdk/query.rb', line 949

def toggle_mcp_server(server_name, enabled)
  send_control_request({
                         subtype: 'mcp_toggle',
                         serverName: server_name,
                         enabled: enabled
                       })
end

#wait_for_result_and_end_inputObject

Wait for the first result before closing stdin when hooks or SDK MCP servers may still need to exchange control messages with the CLI.



979
980
981
982
983
984
985
986
987
988
989
990
# File 'lib/claude_agent_sdk/query.rb', line 979

def wait_for_result_and_end_input
  if !@first_result_received &&
     ((@sdk_mcp_servers && !@sdk_mcp_servers.empty?) || (@hooks && !@hooks.empty?))
    Async::Task.current.with_timeout(stream_close_timeout_seconds) do
      @first_result_condition.wait unless @first_result_received
    end
  end
rescue Async::TimeoutError
  nil
ensure
  @transport.end_input
end

#write(string) ⇒ Object



1010
1011
1012
# File 'lib/claude_agent_sdk/query.rb', line 1010

def write(string)
  @transport.write(string)
end

#writeln(string) ⇒ Object



1006
1007
1008
# File 'lib/claude_agent_sdk/query.rb', line 1006

def writeln(string)
  write string.end_with?("\n") ? string : "#{string}\n"
end