Class: Async::GRPC::XDS::DiscoveryClient

Inherits:
Object
  • Object
show all
Includes:
ADSStream::Delegate
Defined in:
lib/async/grpc/xds/discovery_client.rb

Overview

Client for xDS APIs (ADS or individual APIs) Implements Aggregated Discovery Service (ADS) protocol Acts as delegate for ADSStream, receiving discovery_response events

Constant Summary collapse

LISTENER_TYPE =

xDS API type URLs (v3 API)

"type.googleapis.com/envoy.config.listener.v3.Listener"
ROUTE_TYPE =
"type.googleapis.com/envoy.config.route.v3.RouteConfiguration"
CLUSTER_TYPE =
"type.googleapis.com/envoy.config.cluster.v3.Cluster"
ENDPOINT_TYPE =
"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"
SECRET_TYPE =
"type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.Secret"

Instance Method Summary collapse

Constructor Details

#initialize(server_config, node: nil) ⇒ DiscoveryClient

Initialize xDS discovery client



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/async/grpc/xds/discovery_client.rb', line 37

def initialize(server_config, node: nil)
	@server_uri = server_config[:server_uri]
	@channel_creds = server_config[:channel_creds]
	@server_features = server_config[:server_features] || []
	@node_info = node || build_node_info
	@node = build_node_proto(@node_info)
	@grpc_client = nil
	@versions = {}  # Track version_info per type_url
	@nonces = {}     # Track nonces per type_url
	@mutex = Mutex.new
	@subscriptions = {}  # Track subscriptions by type_url
	@stream_task = nil
	@ads_stream = nil  # ADSStream instance when connected (owns stream state)
	@stream_ready_promise = nil  # Resolved when stream_opened runs
end

Instance Method Details

#closeObject

Close xDS discovery client



88
89
90
91
92
93
94
95
96
97
98
# File 'lib/async/grpc/xds/discovery_client.rb', line 88

def close
	@mutex.synchronize do
		@stream_task&.stop
		@grpc_client&.close
		@grpc_client = nil
		@subscriptions.clear
		@stream_task = nil
		@ads_stream = nil
		@stream_ready_promise = nil
	end
end

#discovery_response(response, stream) ⇒ Object



164
165
166
# File 'lib/async/grpc/xds/discovery_client.rb', line 164

def discovery_response(response, stream)
	process_response(response, stream)
end

#stream_closed(stream) ⇒ Object



160
161
162
# File 'lib/async/grpc/xds/discovery_client.rb', line 160

def stream_closed(stream)
	@mutex.synchronize{@ads_stream = nil}
end

#stream_opened(stream) ⇒ Object



155
156
157
158
# File 'lib/async/grpc/xds/discovery_client.rb', line 155

def stream_opened(stream)
	@mutex.synchronize{@ads_stream = stream}
	@stream_ready_promise&.resolve(stream)
end

#subscribe(type_url, resource_names, &block) ⇒ Object

Subscribe to resource type using ADS (Aggregated Discovery Service - single stream for all types)



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/async/grpc/xds/discovery_client.rb', line 59

def subscribe(type_url, resource_names, &block)
	# Store subscription callback
	@mutex.synchronize do
		@subscriptions[type_url] = {
			resource_names: resource_names,
			callback: block
		}
	end
	
	# Ensure ADS stream is running
	ensure_stream_running
	
	# Wait for stream to be ready (event-driven, no polling)
	promise = @stream_ready_promise
	if promise && !promise.completed?
		begin
			promise.wait(timeout: 5)
		rescue Async::TimeoutError
			# Stream didn't open in time; send_discovery_request will no-op if @ads_stream is nil
		end
	end
	
	send_discovery_request(type_url, resource_names) if @ads_stream
	
	# Return the stream task (already running)
	@stream_task
end