Class: Async::GRPC::XDS::Client

Inherits:
Protocol::HTTP::Middleware
  • Object
show all
Defined in:
lib/async/grpc/xds/client.rb

Overview

Wrapper client for xDS-enabled gRPC connections Follows the same pattern as Async::Redis::SentinelClient and ClusterClient

Defined Under Namespace

Classes: NoEndpointsError

Constant Summary collapse

ConfigurationError =

Raised when xDS configuration cannot be loaded

Context::ConfigurationError
ReloadError =

Raised when cluster configuration cannot be reloaded

Context::ReloadError

Instance Method Summary collapse

Constructor Details

#initialize(service_name, bootstrap: nil, headers: Protocol::HTTP::Headers.new, node: nil, **options) ⇒ Client

Create a new xDS client



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/async/grpc/xds/client.rb', line 37

def initialize(service_name, bootstrap: nil, headers: Protocol::HTTP::Headers.new, node: nil, **options)
	@service_name = service_name
	@bootstrap = load_bootstrap(bootstrap)
	@headers = headers
	@options = options
	
	@context = Context.new(@bootstrap, node: node || @bootstrap[:node])
	@load_balancer = nil
	@clients = {}  # Cache clients per endpoint (like ClusterClient caches node.client)
	@mutex = Mutex.new
end

Instance Method Details

#call(request, attempts: 3) ⇒ Object

Implement Protocol::HTTP::Middleware interface This allows XDS::Client to be used anywhere Async::GRPC::Client is used



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/async/grpc/xds/client.rb', line 98

def call(request, attempts: 3)
	client, endpoint = client_for_call
	@load_balancer.record_request_start(endpoint)
	begin
		client.call(request)
	rescue Protocol::GRPC::Error => error
		# Handle endpoint changes (like ClusterClient handles MOVED/ASK)
		if error.status_code == Protocol::GRPC::Status::UNAVAILABLE
			Console.warn(self, error)
			
			# Invalidate cache, reload configuration
			invalidate_cache!
			
			attempts -= 1
			retry if attempts > 0
		end
		
		raise
	rescue => error
		# Network errors might indicate endpoint failure
		Console.warn(self, error)
		
		# Invalidate this specific endpoint
		invalidate_endpoint(client)
		
		attempts -= 1
		retry if attempts > 0
		
		raise
	end
ensure
	@load_balancer&.record_request_end(endpoint)
end

#client_for_callObject

Get a client for making calls (like ClusterClient.client_for) Resolves endpoints lazily and picks one via load balancer

Raises:



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/async/grpc/xds/client.rb', line 78

def client_for_call
	endpoints = resolve_endpoints
	raise NoEndpointsError, "No endpoints available for #{@service_name}" if endpoints.empty?
	
	# Pick endpoint via load balancer
	endpoint = @load_balancer.pick
	raise NoEndpointsError, "No healthy endpoints available" unless endpoint
	
	# Cache client per endpoint (like ClusterClient caches node.client)
	client = @clients[endpoint] ||= begin
		http_client = Async::HTTP::Client.new(endpoint, **@options)
		Async::GRPC::Client.new(http_client, headers: @headers)
	end
	[client, endpoint]
end

#closeObject

Close xDS client and all connections



177
178
179
180
181
182
# File 'lib/async/grpc/xds/client.rb', line 177

def close
	@clients.each_value(&:close)
	@clients.clear
	@context.close
	@load_balancer&.close
end

#invoke(service, method, request = nil, metadata: {}, timeout: nil, encoding: nil, initial: nil, attempts: 3, &block) ⇒ Object

Invoke an RPC (called by Stub). Load balances per call.



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/async/grpc/xds/client.rb', line 152

def invoke(service, method, request = nil, metadata: {}, timeout: nil, encoding: nil, initial: nil, attempts: 3, &block)
	client, endpoint = client_for_call
	@load_balancer.record_request_start(endpoint)
	begin
		client.invoke(service, method, request, metadata: , timeout: timeout, encoding: encoding, initial: initial, &block)
	rescue Protocol::GRPC::Error => error
		if error.status_code == Protocol::GRPC::Status::UNAVAILABLE
			Console.warn(self, error)
			invalidate_cache!
			attempts -= 1
			retry if attempts > 0
		end
		raise
	rescue => error
		Console.warn(self, error)
		invalidate_endpoint(client)
		attempts -= 1
		retry if attempts > 0
		raise
	end
ensure
	@load_balancer&.record_request_end(endpoint)
end

#resolve_endpointsObject

Resolve endpoints lazily (like SentinelClient.resolve_address)



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/async/grpc/xds/client.rb', line 51

def resolve_endpoints
	@mutex.synchronize do
		unless @load_balancer
			# Discover cluster via CDS
			cluster = @context.discover_cluster(@service_name)
			
			# Discover endpoints via EDS
			endpoints = @context.discover_endpoints(cluster)
			
			raise NoEndpointsError, "No endpoints discovered for #{@service_name}" if endpoints.empty?
			
			# Create load balancer
			@load_balancer = LoadBalancer.new(cluster, endpoints)
			
			# Set load balancer reference in context for endpoint updates
			@context.load_balancer = @load_balancer
		end
		
		@load_balancer.healthy_endpoints
	end
rescue Context::ReloadError => error
	raise NoEndpointsError, "No endpoints discovered for #{@service_name}", cause: error
end

#stub(interface_class, service_name) ⇒ Object

Create a stub for the given interface. Same API as Async::GRPC::Client - load balancing happens per RPC call.



137
138
139
140
# File 'lib/async/grpc/xds/client.rb', line 137

def stub(interface_class, service_name)
	interface = interface_class.new(service_name)
	Stub.new(self, interface)
end