Class: DatagroutConduit::Transport::Ws

Inherits:
Object
  • Object
show all
Defined in:
lib/datagrout_conduit/transport/ws.rb

Overview

WebSocket transport for datagrout-jsonrpc.v1.

Manages a single wss:// connection with concurrent JSON-RPC request multiplexing and server-push subscriptions. Uses a background thread for frame reading; callers block on Thread::Queue for responses.

Usage:

ws = DatagroutConduit::Transport::Ws.new(
  url: "wss://gateway.datagrout.ai/servers/<uuid>/ws",
  auth: { bearer: "token" }
)
ws.connect
result = ws.send_request("tools/list")
sub    = ws.subscribe("agents.my-agent.events")
event  = sub.recv
ws.unsubscribe(sub)
ws.disconnect

Defined Under Namespace

Classes: RequestFuture, SocketAdapter, Subscription, SubscriptionEvent

Constant Summary collapse

SUBPROTOCOL =
"datagrout-jsonrpc.v1"
PING_INTERVAL_SECONDS =

Seconds between client-initiated WS ping frames.

Many load balancers and reverse proxies (nginx, AWS ALB) close idle WS connections after 60-120 seconds; pinging every 25 seconds keeps the connection alive well within the tightest common timeout window. Mirrors PING_INTERVAL in the Rust reference implementation.

25

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url:, auth: {}, identity: nil, ping_interval: PING_INTERVAL_SECONDS) ⇒ Ws

── Construction ─────────────────────────────────────────────────────────



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/datagrout_conduit/transport/ws.rb', line 109

def initialize(url:, auth: {}, identity: nil, ping_interval: PING_INTERVAL_SECONDS)
  @url      = url
  @auth     = normalize_auth(auth)
  @identity = identity
  @mutex    = Mutex.new
  @write_mutex = Mutex.new

  @pending           = {}  # id => RequestFuture
  @pending_subscribe = {}  # id => { topic:, future: }
  @subscriptions     = {}  # sub_id => [Subscription, ...]
  @next_id           = 0

  @io          = nil
  @driver      = nil
  @read_thread = nil
  @ping_thread = nil
  @ping_interval = ping_interval.to_f
  @pings_sent   = 0
  @connected   = false
end

Instance Attribute Details

#ping_intervalObject (readonly)

Number of ping frames this transport has sent over its lifetime. Exposed for tests; production code can ignore it.



132
133
134
# File 'lib/datagrout_conduit/transport/ws.rb', line 132

def ping_interval
  @ping_interval
end

#pings_sentObject (readonly)

Number of ping frames this transport has sent over its lifetime. Exposed for tests; production code can ignore it.



132
133
134
# File 'lib/datagrout_conduit/transport/ws.rb', line 132

def pings_sent
  @pings_sent
end

Instance Method Details

#connectObject

Establish the WebSocket connection. Blocks until the server-side handshake completes (up to 10 s).



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/datagrout_conduit/transport/ws.rb', line 138

def connect
  uri = URI.parse(@url)
  @io = open_socket(uri)

  adapter = SocketAdapter.new(@url, @io)
  @driver = WebSocket::Driver.client(adapter, protocols: [SUBPROTOCOL])

  build_upgrade_headers.each { |k, v| @driver.set_header(k, v) }

  handshake_q = ::Thread::Queue.new

  @driver.on(:open)    { handshake_q.push(nil) unless @connected }
  @driver.on(:message) { |e| handle_message(e.data) }
  @driver.on(:close)   { handle_disconnect }
  @driver.on(:error)   { |e| handshake_q.push(e.message) unless @connected }

  @driver.start
  @read_thread = Thread.new { read_loop }
  @read_thread.abort_on_exception = false
  @read_thread.name = "conduit-ws-reader"

  err = Timeout.timeout(10) { handshake_q.pop }
  raise ConnectionError, "WebSocket handshake failed: #{err}" if err

  @connected = true
  start_ping_thread
  self
rescue Timeout::Error
  cleanup_socket
  raise ConnectionError, "WebSocket connection timed out"
rescue ConnectionError
  raise
rescue => e
  cleanup_socket
  raise ConnectionError, "WebSocket connect error: #{e.message}"
end

#connected?Boolean

Returns:

  • (Boolean)


183
184
185
# File 'lib/datagrout_conduit/transport/ws.rb', line 183

def connected?
  @connected
end

#disconnectObject

Close the connection and fail all pending requests.



176
177
178
179
180
181
# File 'lib/datagrout_conduit/transport/ws.rb', line 176

def disconnect
  @mutex.synchronize { @connected = false }
  fail_all_pending(:disconnected)
  cleanup_socket
  self
end

#send_request(method, params = nil, id: :auto) ⇒ Object

Send a JSON-RPC request and block until the response arrives. Pass id: nil to fire a notification (no response expected). Returns the result value, or raises McpError on RPC-level error.



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/datagrout_conduit/transport/ws.rb', line 190

def send_request(method, params = nil, id: :auto)
  ensure_connected!

  # id: nil means fire-and-forget notification (no id field, no response wait)
  if id.nil?
    frame = { "jsonrpc" => "2.0", "method" => method }
    frame["params"] = params if params
    write_frame(frame)
    return { "result" => {} }
  end

  req_id = mint_id
  future = RequestFuture.new
  @mutex.synchronize { @pending[req_id] = future }

  write_frame(build_request(req_id, method, params))

  result, value = future.wait
  if result == :ok
    { "result" => value }
  else
    raise McpError.new(code: -1, message: value.to_s, data: nil)
  end
end

#subscribe(topic) ⇒ Object

Subscribe to a dotted-namespace topic. Returns a Subscription that delivers events via recv / each.



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/datagrout_conduit/transport/ws.rb', line 217

def subscribe(topic)
  ensure_connected!

  req_id = mint_id
  future = RequestFuture.new
  @mutex.synchronize { @pending_subscribe[req_id] = { topic: topic, future: future } }

  write_frame(build_request(req_id, "subscribe", { "topic" => topic }))

  result, value = future.wait
  if result == :ok
    sub_id = value.is_a?(Hash) ? (value["subscription"] || req_id) : req_id
    sub = Subscription.new(sub_id, topic)
    @mutex.synchronize do
      @subscriptions[sub_id] ||= []
      @subscriptions[sub_id] << sub
    end
    sub
  else
    raise McpError.new(code: -1, message: value.to_s, data: nil)
  end
end

#unsubscribe(subscription) ⇒ Object

Cancel a push subscription locally and notify the server. Accepts a Subscription object or a subscription ID string.



242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/datagrout_conduit/transport/ws.rb', line 242

def unsubscribe(subscription)
  sub_id = subscription.is_a?(Subscription) ? subscription.sub_id : subscription.to_s

  subs = @mutex.synchronize { @subscriptions.delete(sub_id) || [] }
  subs.each(&:_close)

  if @connected
    req_id = mint_id
    write_frame(build_request(req_id, "unsubscribe", { "subscription" => sub_id }))
  end

  :ok
end