Class: RubyLLM::Providers::OpenAIResponses::WebSocket

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_llm/providers/openai_responses/web_socket.rb

Overview

WebSocket transport for the OpenAI Responses API. Provides lower-latency agentic workflows by maintaining a persistent wss:// connection instead of HTTP requests per turn.

Requires the ‘websocket-client-simple` gem (soft dependency).

Integrated usage (recommended):

chat = RubyLLM.chat(model: 'gpt-4o', provider: :openai_responses)
chat.with_params(transport: :websocket)
chat.ask("Hello!")

Standalone usage (advanced):

ws = RubyLLM::ResponsesAPI::WebSocket.new(api_key: ENV['OPENAI_API_KEY'])
ws.connect
ws.create_response(model: 'gpt-4o', input: [...]) { |chunk| ... }
ws.disconnect

Defined Under Namespace

Classes: ConcurrencyError, ConnectionError, ResponseError

Constant Summary collapse

WEBSOCKET_PATH =

rubocop:disable Metrics/ClassLength

'/v1/responses'
KNOWN_PARAMS =
%i[store metadata compact_threshold context_management].freeze
RESPONSE_TIMEOUT =
:response_timeout

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(api_key:, api_base: 'https://api.openai.com/v1', organization_id: nil, project_id: nil, client_class: nil, response_timeout: 60) ⇒ WebSocket

rubocop:disable Metrics/ParameterLists

Parameters:

  • api_key (String)

    OpenAI API key

  • api_base (String) (defaults to: 'https://api.openai.com/v1')

    API base URL (https scheme; converted to wss)

  • organization_id (String, nil) (defaults to: nil)

    OpenAI organization ID

  • project_id (String, nil) (defaults to: nil)

    OpenAI project ID

  • client_class (#connect, nil) (defaults to: nil)

    WebSocket client class (for testing)

  • response_timeout (Numeric) (defaults to: 60)

    seconds to wait for a response event



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/ruby_llm/providers/openai_responses/web_socket.rb', line 38

def initialize(api_key:, api_base: 'https://api.openai.com/v1', organization_id: nil, project_id: nil,
               client_class: nil, response_timeout: 60)
  @api_key = api_key
  @api_base = api_base
  @organization_id = organization_id
  @project_id = project_id
  @client_class = client_class
  @response_timeout = response_timeout

  @ws = nil
  @mutex = Mutex.new
  @connected = false
  @in_flight = false
  @last_response_id = nil
  @message_queue = nil
end

Instance Attribute Details

#last_response_idObject (readonly)

Returns the value of attribute last_response_id.



29
30
31
# File 'lib/ruby_llm/providers/openai_responses/web_socket.rb', line 29

def last_response_id
  @last_response_id
end

Instance Method Details

#call(payload) {|RubyLLM::Chunk| ... } ⇒ RubyLLM::Message

Send a pre-built payload over WebSocket, streaming chunks via block. This is the integration point for Provider#complete – it accepts the same payload hash that render_payload returns.

Parameters:

  • payload (Hash)

    Responses API payload (model, input, tools, etc.)

Yields:

  • (RubyLLM::Chunk)

    each streamed chunk

Returns:

  • (RubyLLM::Message)

    the assembled final message



99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/ruby_llm/providers/openai_responses/web_socket.rb', line 99

def call(payload, &)
  ensure_connected!
  acquire_flight!

  queue = Queue.new
  @mutex.synchronize { @message_queue = queue }

  envelope = { type: 'response.create' }.merge(payload.except(:stream))
  send_json(envelope)
  accumulate_response(queue, &)
ensure
  @mutex.synchronize { @message_queue = nil }
  release_flight!
end

#connect(timeout: 10) ⇒ self

Open the WebSocket connection. Blocks until the connection is established.

Parameters:

  • timeout (Numeric) (defaults to: 10)

    seconds to wait for the connection (default: 10)

Returns:

  • (self)

Raises:



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/ruby_llm/providers/openai_responses/web_socket.rb', line 60

def connect(timeout: 10)
  client_class = @client_class || resolve_client_class

  ready = Queue.new
  error_holder = []
  # websocket-client-simple invokes on() blocks with instance_exec, so any
  # @ivar reference inside resolves to the underlying client, not us.
  # Capture self as a local so the handlers can call back into this object.
  owner = self

  @ws = client_class.connect(build_ws_url, headers: build_headers)

  @ws.on(:open) { ready.push(:ok) }

  @ws.on(:error) do |e|
    error_holder << e
    ready.push(:error) unless owner.connected?
  end

  @ws.on(:close) { owner.__send__(:handle_close) }
  @ws.on(:message) { |msg| owner.__send__(:handle_message, msg.data) }

  result = pop_with_timeout(ready, timeout)
  if result == :error || result.nil?
    err = error_holder.first
    raise ConnectionError, "WebSocket connection failed: #{err&.message || 'timeout'}"
  end

  @mutex.synchronize { @connected = true }
  self
end

#connected?Boolean

Returns:

  • (Boolean)


178
179
180
# File 'lib/ruby_llm/providers/openai_responses/web_socket.rb', line 178

def connected?
  @mutex.synchronize { @connected }
end

#create_response(model:, input:, tools: nil, previous_response_id: nil, instructions: nil, **extra) {|RubyLLM::Chunk| ... } ⇒ RubyLLM::Message

Send a response.create request using raw Responses API format. Useful for standalone usage outside the RubyLLM chat interface.

Parameters:

  • model (String)

    model ID

  • input (Array<Hash>)

    input items in Responses API format

  • tools (Array<Hash>, nil) (defaults to: nil)

    tool definitions

  • previous_response_id (String, nil) (defaults to: nil)

    chain to a prior response

  • instructions (String, nil) (defaults to: nil)

    system/developer instructions

  • extra (Hash)

    additional fields forwarded to the API

Yields:

  • (RubyLLM::Chunk)

    each streamed chunk

Returns:

  • (RubyLLM::Message)

    the assembled final message



125
126
127
128
129
130
131
132
133
# File 'lib/ruby_llm/providers/openai_responses/web_socket.rb', line 125

def create_response(model:, input:, tools: nil, previous_response_id: nil, instructions: nil, **extra, &block) # rubocop:disable Metrics/ParameterLists
  payload = build_standalone_payload(
    model: model, input: input, tools: tools,
    previous_response_id: previous_response_id,
    instructions: instructions, **extra
  )

  call(payload, &block)
end

#disconnectvoid

This method returns an undefined value.

Disconnect the WebSocket.



172
173
174
175
# File 'lib/ruby_llm/providers/openai_responses/web_socket.rb', line 172

def disconnect
  @ws&.close
  @mutex.synchronize { @connected = false }
end

#reconnect(timeout: 10) ⇒ self

Close and reopen the connection.

Returns:

  • (self)


184
185
186
187
# File 'lib/ruby_llm/providers/openai_responses/web_socket.rb', line 184

def reconnect(timeout: 10)
  disconnect
  connect(timeout: timeout)
end

#warmup(model:, **extra) ⇒ void

This method returns an undefined value.

Warm up the connection by sending a response.create with generate: false. Caches model weights server-side without generating output.

Parameters:

  • model (String)

    model ID

  • extra (Hash)

    additional fields



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/ruby_llm/providers/openai_responses/web_socket.rb', line 140

def warmup(model:, **extra)
  ensure_connected!
  acquire_flight!

  queue = Queue.new
  @mutex.synchronize { @message_queue = queue }

  payload = { type: 'response.create', model: model, generate: false }.merge(extra)

  send_json(payload)

  loop do
    data = queue.pop
    break if data.nil?

    parsed = JSON.parse(data)
    event_type = parsed['type']

    if event_type == 'error'
      error_msg = parsed.dig('error', 'message') || 'Warmup error'
      raise ResponseError, error_msg
    end

    break if event_type == 'response.completed'
  end
ensure
  @mutex.synchronize { @message_queue = nil }
  release_flight!
end