Class: DatagroutConduit::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/datagrout_conduit/client.rb

Overview

Main Conduit client. Connects to remote MCP / JSONRPC servers over HTTP, sends requests, and parses responses. This is purely a client library —it does NOT run a server or accept connections.

Constant Summary collapse

PROTOCOL_VERSION =
"2025-03-26"
CLIENT_NAME =
"datagrout-conduit-ruby"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url:, auth: {}, transport: :mcp, identity: nil, identity_dir: nil, use_intelligent_interface: nil, max_retries: 3, logger: nil, identity_auto: false, disable_mtls: false) ⇒ Client

Returns a new instance of Client.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/datagrout_conduit/client.rb', line 33

def initialize(url:, auth: {}, transport: :mcp, identity: nil, identity_dir: nil,
               use_intelligent_interface: nil, max_retries: 3, logger: nil,
               identity_auto: false, disable_mtls: false)
  @url = url
  @auth = auth
  @transport_mode = transport
  @identity = identity
  @identity_dir = identity_dir
  @identity_auto = identity_auto
  @disable_mtls = disable_mtls  # deprecated, kept for backward compat
  @max_retries = max_retries
  @initialized = false
  @server_info = nil
  @logger = logger || default_logger
  @is_dg = DatagroutConduit.dg_url?(url)
  @dg_warned = false

  @use_intelligent_interface = if use_intelligent_interface.nil?
                                 @is_dg
                               else
                                 use_intelligent_interface
                               end

  resolve_identity!
  @transport = build_transport
end

Instance Attribute Details

#server_infoObject (readonly)

Returns the value of attribute server_info.



14
15
16
# File 'lib/datagrout_conduit/client.rb', line 14

def server_info
  @server_info
end

#transportObject (readonly)

Returns the value of attribute transport.



14
15
16
# File 'lib/datagrout_conduit/client.rb', line 14

def transport
  @transport
end

#use_intelligent_interfaceObject (readonly)

Returns the value of attribute use_intelligent_interface.



14
15
16
# File 'lib/datagrout_conduit/client.rb', line 14

def use_intelligent_interface
  @use_intelligent_interface
end

Class Method Details

.bootstrap_identity(url:, auth_token:, name: "conduit-client", identity_dir: nil) ⇒ Object

Bootstrap an mTLS identity: discover existing or register a new one.

Checks the auto-discovery chain first. If an existing identity is found and not near expiry it is used as-is. Otherwise, a new keypair is generated, registered with DataGrout using the provided bearer token, saved to the identity directory, and loaded as the active identity.

After the first successful bootstrap the identity is persisted locally and auto-discovered on subsequent runs — no token or API key is needed.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/datagrout_conduit/client.rb', line 69

def self.bootstrap_identity(url:, auth_token:, name: "conduit-client", identity_dir: nil)
  dir = identity_dir || Registration.default_identity_dir || File.join(Dir.home, ".conduit")

  identity = Identity.try_discover(override_dir: dir)
  if identity && !identity.needs_rotation?
    return new(url: url, identity: identity)
  end

  private_pem, public_pem = Registration.generate_keypair
  reg = Registration.register_identity(
    public_pem,
    auth_token: auth_token,
    name: name
  )
  Registration.save_identity(reg.cert_pem, private_pem, dir, ca_pem: reg.ca_cert_pem)

  ca_path = reg.ca_cert_pem ? File.join(dir, "ca.pem") : nil
  identity = Identity.from_paths(
    File.join(dir, "identity.pem"),
    File.join(dir, "identity_key.pem"),
    ca_path: ca_path
  )

  new(url: url, identity: identity)
end

.bootstrap_identity_oauth(url:, client_id:, client_secret:, name: "conduit-client", identity_dir: nil) ⇒ Object

Bootstrap an mTLS identity using OAuth 2.1 client_credentials.

Same flow as bootstrap_identity but obtains the bearer token via OAuth client_credentials exchange first.



99
100
101
102
103
104
105
106
107
# File 'lib/datagrout_conduit/client.rb', line 99

def self.bootstrap_identity_oauth(url:, client_id:, client_secret:, name: "conduit-client", identity_dir: nil)
  provider = OAuth::TokenProvider.new(
    client_id: client_id,
    client_secret: client_secret,
    token_endpoint: OAuth::TokenProvider.derive_token_endpoint(url)
  )
  token = provider.get_token
  bootstrap_identity(url: url, auth_token: token, name: name, identity_dir: identity_dir)
end

.bootstrap_onramp(opts:, url: nil, name: "conduit-client", identity_dir: nil) ⇒ Client

Bootstrap by performing the autonomous DG onramp flow.

The all-in-one flow: onramp (no prior credentials required) →OAuth token exchange → mTLS identity registration and persistence.

On subsequent runs the saved mTLS identity is auto-discovered and no credentials are needed.

Parameters:

  • opts (DatagroutConduit::Onramp::OnrampOptions)
  • url (String, nil) (defaults to: nil)

    MCP server URL; required when opts.mcp_url is absent

  • name (String) (defaults to: "conduit-client")

    human-readable identity label

  • identity_dir (String, nil) (defaults to: nil)

    custom identity storage directory

Returns:

  • (Client)

    unconnected client; call #connect before use

Raises:

  • (ArgumentError)


122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/datagrout_conduit/client.rb', line 122

def self.bootstrap_onramp(opts:, url: nil, name: "conduit-client", identity_dir: nil)
  dir = identity_dir || Registration.default_identity_dir || File.join(Dir.home, ".conduit")

  # Fast path: existing valid identity.
  identity = Identity.try_discover(override_dir: dir)
  if identity && !identity.needs_rotation?
    raise ArgumentError, "'url' must be provided when an existing identity is reused" if url.nil?
    return new(url: url, identity: identity)
  end

  # Slow path: full onramp flow.
  creds, token = Onramp.register_and_exchange(opts)

  mcp_url = creds.mcp_url || url
  raise ArgumentError, "'url' must be provided when mcp_url is absent from onramp response" if mcp_url.nil?

  bootstrap_identity(url: mcp_url, auth_token: token, name: name, identity_dir: identity_dir)
end

Instance Method Details

#call_tool(name, arguments = {}) ⇒ Object



203
204
205
206
207
208
209
210
# File 'lib/datagrout_conduit/client.rb', line 203

def call_tool(name, arguments = {})
  ensure_initialized!

  params = { "name" => name.to_s, "arguments" => normalize_hash(arguments) }
  response = send_with_retry("tools/call", params)
  result = response.is_a?(Hash) ? (response["result"] || response) : response
  unwrap_content(result)
end

#connectObject



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/datagrout_conduit/client.rb', line 141

def connect
  @transport.connect

  params = {
    "protocolVersion" => PROTOCOL_VERSION,
    "clientInfo" => { "name" => CLIENT_NAME, "version" => DatagroutConduit::VERSION },
    "capabilities" => { "tools" => {} }
  }

  response = @transport.send_request("initialize", params)

  if response.is_a?(Hash) && response["result"]
    result = response["result"]
    @server_info = result["serverInfo"]
  end

  @transport.send_request("notifications/initialized", nil, id: nil)
  @initialized = true
  self
end

#deliverablesObject



263
264
265
# File 'lib/datagrout_conduit/client.rb', line 263

def deliverables
  @deliverables ||= DeliverablesNamespace.new(self)
end

#dg(tool_short_name, params = {}) ⇒ Object

Call any DataGrout first-party tool by short name. e.g. client.dg(“prism.render”, { payload: data, goal: “summary” })



360
361
362
363
# File 'lib/datagrout_conduit/client.rb', line 360

def dg(tool_short_name, params = {})
  ensure_initialized!
  call_dg_tool("data-grout/#{tool_short_name}", params)
end

#disconnectObject



162
163
164
165
166
# File 'lib/datagrout_conduit/client.rb', line 162

def disconnect
  @transport.disconnect
  @initialized = false
  self
end

#discover(goal: nil, query: nil, limit: 10, min_score: 0.0, integrations: [], servers: []) ⇒ Object

Semantic discovery — find tools by natural language goal or query.



280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/datagrout_conduit/client.rb', line 280

def discover(goal: nil, query: nil, limit: 10, min_score: 0.0,
             integrations: [], servers: [])
  warn_if_not_dg("discover")
  ensure_initialized!

  params = { "limit" => limit, "min_score" => min_score }
  params["goal"] = goal if goal
  params["query"] = query if query
  params["integrations"] = integrations unless integrations.empty?
  params["servers"] = servers unless servers.empty?

  result = call_dg_tool("data-grout/discovery.discover", params)
  DiscoverResult.from_hash(result)
end

#ephemeralsObject



267
268
269
# File 'lib/datagrout_conduit/client.rb', line 267

def ephemerals
  @ephemerals ||= EphemeralsNamespace.new(self)
end

#estimate_cost(tool_name, arguments = {}) ⇒ Object

Estimate cost before execution.



366
367
368
369
370
371
# File 'lib/datagrout_conduit/client.rb', line 366

def estimate_cost(tool_name, arguments = {})
  ensure_initialized!

  args = normalize_hash(arguments).merge("estimate_only" => true)
  call_dg_tool(tool_name.to_s, args)
end

#flowObject



271
272
273
# File 'lib/datagrout_conduit/client.rb', line 271

def flow
  @flow ||= FlowNamespace.new(self)
end

#get_prompt(name, arguments = {}) ⇒ Object



236
237
238
239
240
241
242
243
244
245
# File 'lib/datagrout_conduit/client.rb', line 236

def get_prompt(name, arguments = {})
  ensure_initialized!

  params = { "name" => name.to_s }
  params["arguments"] = normalize_hash(arguments) unless arguments.nil? || arguments.empty?

  response = send_with_retry("prompts/get", params)
  result = response.is_a?(Hash) ? (response["result"] || response) : response
  result["messages"] || []
end

#guide(goal: nil, session_id: nil, choice: nil) ⇒ Object

Start or continue a guided workflow.



325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/datagrout_conduit/client.rb', line 325

def guide(goal: nil, session_id: nil, choice: nil)
  warn_if_not_dg("guide")
  ensure_initialized!

  params = {}
  params["goal"] = goal if goal
  params["session_id"] = session_id if session_id
  params["choice"] = choice if choice

  result = call_dg_tool("data-grout/discovery.guide", params)
  GuidedSession.new(self, GuideState.from_hash(result))
end

#initialized?Boolean

Returns:

  • (Boolean)


168
169
170
# File 'lib/datagrout_conduit/client.rb', line 168

def initialized?
  @initialized
end

#list_promptsObject



228
229
230
231
232
233
234
# File 'lib/datagrout_conduit/client.rb', line 228

def list_prompts
  ensure_initialized!

  response = send_with_retry("prompts/list", {})
  result = response.is_a?(Hash) ? (response["result"] || response) : response
  result["prompts"] || []
end

#list_resourcesObject



212
213
214
215
216
217
218
# File 'lib/datagrout_conduit/client.rb', line 212

def list_resources
  ensure_initialized!

  response = send_with_retry("resources/list", {})
  result = response.is_a?(Hash) ? (response["result"] || response) : response
  result["resources"] || []
end

#list_toolsObject

Standard MCP Methods



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/datagrout_conduit/client.rb', line 176

def list_tools
  ensure_initialized!

  all_tools = []
  cursor = nil

  loop do
    params = {}
    params["cursor"] = cursor if cursor

    response = send_with_retry("tools/list", params)
    result = response.is_a?(Hash) ? (response["result"] || response) : response

    tools_data = result["tools"] || []
    tools_data.each { |t| all_tools << Tool.from_hash(t) }

    cursor = result["nextCursor"] || result["next_cursor"]
    break unless cursor
  end

  if @use_intelligent_interface
    all_tools.reject! { |t| t.name.include?("@") }
  end

  all_tools
end

#logicObject



255
256
257
# File 'lib/datagrout_conduit/client.rb', line 255

def logic
  @logic ||= LogicNamespace.new(self)
end

#perform(tool_name, arguments = {}, demux: false, demux_mode: nil) ⇒ Object

Execute a tool call through the DataGrout intelligent interface.



296
297
298
299
300
301
302
303
304
305
# File 'lib/datagrout_conduit/client.rb', line 296

def perform(tool_name, arguments = {}, demux: false, demux_mode: nil)
  warn_if_not_dg("perform")
  ensure_initialized!

  params = { "tool" => tool_name.to_s, "args" => normalize_hash(arguments) }
  params["demux"] = demux if demux
  params["demux_mode"] = demux_mode if demux_mode

  call_dg_tool("data-grout/discovery.perform", params)
end

#perform_batch(calls) ⇒ Object

Execute multiple tool calls in a single gateway request.

Each element should be a hash with “tool” and “args” keys. Returns an array of results in the same order as the input calls.

results = client.perform_batch([
  { "tool" => "data-grout/data.count", "args" => { "data" => [1, 2, 3] } },
  { "tool" => "data-grout/data.keys",  "args" => { "data" => { "a" => 1 } } }
])


316
317
318
319
320
321
322
# File 'lib/datagrout_conduit/client.rb', line 316

def perform_batch(calls)
  warn_if_not_dg("perform_batch")
  ensure_initialized!

  result = call_dg_tool("data-grout/discovery.perform", calls)
  result.is_a?(Array) ? result : [result]
end

#plan(goal: nil, query: nil, **opts) ⇒ Object

Semantic discovery plan — return a ranked list of tools for a goal. At least one of ‘goal:` or `query:` must be provided.

Raises:

  • (ArgumentError)


340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/datagrout_conduit/client.rb', line 340

def plan(goal: nil, query: nil, **opts)
  raise ArgumentError, "plan() requires at least one of goal: or query:" unless goal || query

  params = {}
  params["goal"] = goal if goal
  params["query"] = query if query
  params["server"] = opts[:server] if opts[:server]
  params["k"] = opts[:k] if opts[:k]
  params["policy"] = opts[:policy] if opts[:policy]
  params["have"] = opts[:have] if opts[:have]
  params["return_call_handles"] = opts[:return_call_handles] if opts.key?(:return_call_handles)
  params["expose_virtual_skills"] = opts[:expose_virtual_skills] if opts.key?(:expose_virtual_skills)
  params["model_overrides"] = opts[:model_overrides] if opts[:model_overrides]
  warn_if_not_dg("plan")
  ensure_initialized!
  call_dg_tool("data-grout/discovery.plan", params)
end

#prismObject

Namespace Accessors



251
252
253
# File 'lib/datagrout_conduit/client.rb', line 251

def prism
  @prism ||= PrismNamespace.new(self)
end

#read_resource(uri) ⇒ Object



220
221
222
223
224
225
226
# File 'lib/datagrout_conduit/client.rb', line 220

def read_resource(uri)
  ensure_initialized!

  response = send_with_retry("resources/read", { "uri" => uri.to_s })
  result = response.is_a?(Hash) ? (response["result"] || response) : response
  result["contents"] || []
end

#subscribe(topic) ⇒ Object

Subscribe to a server-push topic over a WebSocket transport. Returns a Transport::Ws::Subscription. Raises RuntimeError when transport is not :websocket.



19
20
21
22
23
# File 'lib/datagrout_conduit/client.rb', line 19

def subscribe(topic)
  raise "subscribe() requires transport: :websocket" unless @transport.is_a?(Transport::Ws)

  @transport.subscribe(topic)
end

#unsubscribe(subscription) ⇒ Object

Cancel a push subscription. Accepts a Subscription object or a subscription ID string.



27
28
29
30
31
# File 'lib/datagrout_conduit/client.rb', line 27

def unsubscribe(subscription)
  raise "unsubscribe() requires transport: :websocket" unless @transport.is_a?(Transport::Ws)

  @transport.unsubscribe(subscription)
end

#wardenObject



259
260
261
# File 'lib/datagrout_conduit/client.rb', line 259

def warden
  @warden ||= WardenNamespace.new(self)
end