Class: ClaudeAgentSDK::Query

Inherits:
Object
  • Object
show all
Defined in:
lib/claude_agent_sdk/query.rb

Overview

Handles bidirectional control protocol on top of Transport

This class manages:

  • Control request/response routing

  • Hook callbacks

  • Tool permission callbacks

  • Message streaming

  • Initialization handshake

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil) ⇒ Query

Returns a new instance of Query.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/claude_agent_sdk/query.rb', line 22

def initialize(transport:, is_streaming_mode:, can_use_tool: nil, hooks: nil, sdk_mcp_servers: nil)
  @transport = transport
  @is_streaming_mode = is_streaming_mode
  @can_use_tool = can_use_tool
  @hooks = hooks || {}
  @sdk_mcp_servers = sdk_mcp_servers || {}

  # Control protocol state
  @pending_control_responses = {}
  @pending_control_results = {}
  @hook_callbacks = {}
  @next_callback_id = 0
  @request_counter = 0

  # Message stream
  @message_queue = Async::Queue.new
  @task = nil
  @initialized = false
  @closed = false
  @initialization_result = nil
end

Instance Attribute Details

#is_streaming_modeObject (readonly)

Returns the value of attribute is_streaming_mode.



20
21
22
# File 'lib/claude_agent_sdk/query.rb', line 20

def is_streaming_mode
  @is_streaming_mode
end

#sdk_mcp_serversObject (readonly)

Returns the value of attribute sdk_mcp_servers.



20
21
22
# File 'lib/claude_agent_sdk/query.rb', line 20

def sdk_mcp_servers
  @sdk_mcp_servers
end

#transportObject (readonly)

Returns the value of attribute transport.



20
21
22
# File 'lib/claude_agent_sdk/query.rb', line 20

def transport
  @transport
end

Instance Method Details

#closeObject

Close the query and transport



501
502
503
504
505
# File 'lib/claude_agent_sdk/query.rb', line 501

def close
  @closed = true
  @task&.stop
  @transport.close
end

#initialize_protocolHash?

Initialize control protocol if in streaming mode

Returns:

  • (Hash, nil)

    Initialize response with supported commands, or nil if not streaming



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/claude_agent_sdk/query.rb', line 46

def initialize_protocol
  return nil unless @is_streaming_mode

  # Build hooks configuration for initialization
  hooks_config = {}
  if @hooks && !@hooks.empty?
    @hooks.each do |event, matchers|
      next if matchers.nil? || matchers.empty?

      hooks_config[event] = []
      matchers.each do |matcher|
        callback_ids = []
        (matcher[:hooks] || []).each do |callback|
          callback_id = "hook_#{@next_callback_id}"
          @next_callback_id += 1
          @hook_callbacks[callback_id] = callback
          callback_ids << callback_id
        end
        hooks_config[event] << {
          matcher: matcher[:matcher],
          hookCallbackIds: callback_ids
        }
      end
    end
  end

  # Send initialize request
  request = {
    subtype: 'initialize',
    hooks: hooks_config.empty? ? nil : hooks_config
  }

  response = send_control_request(request)
  @initialized = true
  @initialization_result = response
  response
end

#interruptObject

Send interrupt control request



455
456
457
# File 'lib/claude_agent_sdk/query.rb', line 455

def interrupt
  send_control_request({ subtype: 'interrupt' })
end

#receive_messages(&block) ⇒ Object

Receive SDK messages (not control messages)



488
489
490
491
492
493
494
495
496
497
498
# File 'lib/claude_agent_sdk/query.rb', line 488

def receive_messages(&block)
  return enum_for(:receive_messages) unless block

  loop do
    message = @message_queue.dequeue
    break if message[:type] == 'end'
    raise message[:error] if message[:type] == 'error'

    block.call(message)
  end
end

#set_model(model) ⇒ Object

Change the AI model



468
469
470
471
472
473
# File 'lib/claude_agent_sdk/query.rb', line 468

def set_model(model)
  send_control_request({
                         subtype: 'set_model',
                         model: model
                       })
end

#set_permission_mode(mode) ⇒ Object

Change permission mode



460
461
462
463
464
465
# File 'lib/claude_agent_sdk/query.rb', line 460

def set_permission_mode(mode)
  send_control_request({
                         subtype: 'set_permission_mode',
                         mode: mode
                       })
end

#startObject

Start reading messages from transport



85
86
87
88
89
90
91
# File 'lib/claude_agent_sdk/query.rb', line 85

def start
  return if @task

  @task = Async do |task|
    task.async { read_messages }
  end
end

#stream_input(stream) ⇒ Object

Stream input messages to transport



476
477
478
479
480
481
482
483
484
485
# File 'lib/claude_agent_sdk/query.rb', line 476

def stream_input(stream)
  stream.each do |message|
    break if @closed
    @transport.write(JSON.generate(message) + "\n")
  end
  @transport.end_input
rescue StandardError => e
  # Log error but don't raise
  warn "Error streaming input: #{e.message}"
end