Class: DatagroutConduit::Transport::Ws

Inherits:
Object
  • Object
show all
Defined in:
lib/datagrout_conduit/transport/ws.rb

Overview

WebSocket transport for datagrout-jsonrpc.v1.

Manages a single wss:// connection with concurrent JSON-RPC request multiplexing and server-push subscriptions. Uses a background thread for frame reading; callers block on Thread::Queue for responses.

Usage:

ws = DatagroutConduit::Transport::Ws.new(
  url: "wss://gateway.datagrout.ai/servers/<uuid>/ws",
  auth: { bearer: "token" }
)
ws.connect
result = ws.send_request("tools/list")
sub    = ws.subscribe("agents.my-agent.events")
event  = sub.recv
ws.unsubscribe(sub)
ws.disconnect

Defined Under Namespace

Classes: RequestFuture, SocketAdapter, Subscription, SubscriptionEvent

Constant Summary collapse

SUBPROTOCOL =
"datagrout-jsonrpc.v1"

Instance Method Summary collapse

Constructor Details

#initialize(url:, auth: {}, identity: nil) ⇒ Ws

── Construction ─────────────────────────────────────────────────────────



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/datagrout_conduit/transport/ws.rb', line 101

def initialize(url:, auth: {}, identity: nil)
  @url      = url
  @auth     = normalize_auth(auth)
  @identity = identity
  @mutex    = Mutex.new
  @write_mutex = Mutex.new

  @pending           = {}  # id => RequestFuture
  @pending_subscribe = {}  # id => { topic:, future: }
  @subscriptions     = {}  # sub_id => [Subscription, ...]
  @next_id           = 0

  @io          = nil
  @driver      = nil
  @read_thread = nil
  @connected   = false
end

Instance Method Details

#connectObject

Establish the WebSocket connection. Blocks until the server-side handshake completes (up to 10 s).



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/datagrout_conduit/transport/ws.rb', line 123

def connect
  uri = URI.parse(@url)
  @io = open_socket(uri)

  adapter = SocketAdapter.new(@url, @io)
  @driver = WebSocket::Driver.client(adapter, protocols: [SUBPROTOCOL])

  build_upgrade_headers.each { |k, v| @driver.set_header(k, v) }

  handshake_q = ::Thread::Queue.new

  @driver.on(:open)    { handshake_q.push(nil) unless @connected }
  @driver.on(:message) { |e| handle_message(e.data) }
  @driver.on(:close)   { handle_disconnect }
  @driver.on(:error)   { |e| handshake_q.push(e.message) unless @connected }

  @driver.start
  @read_thread = Thread.new { read_loop }
  @read_thread.abort_on_exception = false
  @read_thread.name = "conduit-ws-reader"

  err = Timeout.timeout(10) { handshake_q.pop }
  raise ConnectionError, "WebSocket handshake failed: #{err}" if err

  @connected = true
  self
rescue Timeout::Error
  cleanup_socket
  raise ConnectionError, "WebSocket connection timed out"
rescue ConnectionError
  raise
rescue => e
  cleanup_socket
  raise ConnectionError, "WebSocket connect error: #{e.message}"
end

#connected?Boolean

Returns:

  • (Boolean)


167
168
169
# File 'lib/datagrout_conduit/transport/ws.rb', line 167

def connected?
  @connected
end

#disconnectObject

Close the connection and fail all pending requests.



160
161
162
163
164
165
# File 'lib/datagrout_conduit/transport/ws.rb', line 160

def disconnect
  @mutex.synchronize { @connected = false }
  fail_all_pending(:disconnected)
  cleanup_socket
  self
end

#send_request(method, params = nil, id: :auto) ⇒ Object

Send a JSON-RPC request and block until the response arrives. Pass id: nil to fire a notification (no response expected). Returns the result value, or raises McpError on RPC-level error.



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/datagrout_conduit/transport/ws.rb', line 174

def send_request(method, params = nil, id: :auto)
  ensure_connected!

  # id: nil means fire-and-forget notification (no id field, no response wait)
  if id.nil?
    frame = { "jsonrpc" => "2.0", "method" => method }
    frame["params"] = params if params
    write_frame(frame)
    return { "result" => {} }
  end

  req_id = mint_id
  future = RequestFuture.new
  @mutex.synchronize { @pending[req_id] = future }

  write_frame(build_request(req_id, method, params))

  result, value = future.wait
  if result == :ok
    { "result" => value }
  else
    raise McpError.new(code: -1, message: value.to_s, data: nil)
  end
end

#subscribe(topic) ⇒ Object

Subscribe to a dotted-namespace topic. Returns a Subscription that delivers events via recv / each.



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/datagrout_conduit/transport/ws.rb', line 201

def subscribe(topic)
  ensure_connected!

  req_id = mint_id
  future = RequestFuture.new
  @mutex.synchronize { @pending_subscribe[req_id] = { topic: topic, future: future } }

  write_frame(build_request(req_id, "subscribe", { "topic" => topic }))

  result, value = future.wait
  if result == :ok
    sub_id = value.is_a?(Hash) ? (value["subscription"] || req_id) : req_id
    sub = Subscription.new(sub_id, topic)
    @mutex.synchronize do
      @subscriptions[sub_id] ||= []
      @subscriptions[sub_id] << sub
    end
    sub
  else
    raise McpError.new(code: -1, message: value.to_s, data: nil)
  end
end

#unsubscribe(subscription) ⇒ Object

Cancel a push subscription locally and notify the server. Accepts a Subscription object or a subscription ID string.



226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/datagrout_conduit/transport/ws.rb', line 226

def unsubscribe(subscription)
  sub_id = subscription.is_a?(Subscription) ? subscription.sub_id : subscription.to_s

  subs = @mutex.synchronize { @subscriptions.delete(sub_id) || [] }
  subs.each(&:_close)

  if @connected
    req_id = mint_id
    write_frame(build_request(req_id, "unsubscribe", { "subscription" => sub_id }))
  end

  :ok
end