Class: Async::GRPC::XDS::Client
- Inherits:
-
Protocol::HTTP::Middleware
- Object
- Protocol::HTTP::Middleware
- Async::GRPC::XDS::Client
- Defined in:
- lib/async/grpc/xds/client.rb
Overview
Wrapper client for xDS-enabled gRPC connections Follows the same pattern as Async::Redis::SentinelClient and ClusterClient
Defined Under Namespace
Classes: NoEndpointsError
Constant Summary collapse
- ConfigurationError =
Raised when xDS configuration cannot be loaded
Context::ConfigurationError
- ReloadError =
Raised when cluster configuration cannot be reloaded
Context::ReloadError
Instance Method Summary collapse
-
#call(request, attempts: 3) ⇒ Object
Implement Protocol::HTTP::Middleware interface This allows XDS::Client to be used anywhere Async::GRPC::Client is used.
-
#client_for_call ⇒ Object
Get a client for making calls (like ClusterClient.client_for) Resolves endpoints lazily and picks one via load balancer.
-
#close ⇒ Object
Close xDS client and all connections.
-
#initialize(service_name, bootstrap: nil, headers: Protocol::HTTP::Headers.new, node: nil, **options) ⇒ Client
constructor
Create a new xDS client.
-
#invoke(service, method, request = nil, metadata: {}, timeout: nil, encoding: nil, initial: nil, attempts: 3, &block) ⇒ Object
Invoke an RPC (called by Stub).
-
#resolve_endpoints ⇒ Object
Resolve endpoints lazily (like SentinelClient.resolve_address).
-
#stub(interface_class, service_name) ⇒ Object
Create a stub for the given interface.
Constructor Details
#initialize(service_name, bootstrap: nil, headers: Protocol::HTTP::Headers.new, node: nil, **options) ⇒ Client
Create a new xDS client
37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/async/grpc/xds/client.rb', line 37 def initialize(service_name, bootstrap: nil, headers: Protocol::HTTP::Headers.new, node: nil, **) @service_name = service_name @bootstrap = load_bootstrap(bootstrap) @headers = headers @options = @context = Context.new(@bootstrap, node: node || @bootstrap[:node]) @load_balancer = nil @clients = {} # Cache clients per endpoint (like ClusterClient caches node.client) @mutex = Mutex.new end |
Instance Method Details
#call(request, attempts: 3) ⇒ Object
Implement Protocol::HTTP::Middleware interface This allows XDS::Client to be used anywhere Async::GRPC::Client is used
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/async/grpc/xds/client.rb', line 98 def call(request, attempts: 3) client, endpoint = client_for_call @load_balancer.record_request_start(endpoint) begin client.call(request) rescue Protocol::GRPC::Error => error # Handle endpoint changes (like ClusterClient handles MOVED/ASK) if error.status_code == Protocol::GRPC::Status::UNAVAILABLE Console.warn(self, error) # Invalidate cache, reload configuration invalidate_cache! attempts -= 1 retry if attempts > 0 end raise rescue => error # Network errors might indicate endpoint failure Console.warn(self, error) # Invalidate this specific endpoint invalidate_endpoint(client) attempts -= 1 retry if attempts > 0 raise end ensure @load_balancer&.record_request_end(endpoint) end |
#client_for_call ⇒ Object
Get a client for making calls (like ClusterClient.client_for) Resolves endpoints lazily and picks one via load balancer
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/async/grpc/xds/client.rb', line 78 def client_for_call endpoints = resolve_endpoints raise NoEndpointsError, "No endpoints available for #{@service_name}" if endpoints.empty? # Pick endpoint via load balancer endpoint = @load_balancer.pick raise NoEndpointsError, "No healthy endpoints available" unless endpoint # Cache client per endpoint (like ClusterClient caches node.client) client = @clients[endpoint] ||= begin http_client = Async::HTTP::Client.new(endpoint, **@options) Async::GRPC::Client.new(http_client, headers: @headers) end [client, endpoint] end |
#close ⇒ Object
Close xDS client and all connections
177 178 179 180 181 182 |
# File 'lib/async/grpc/xds/client.rb', line 177 def close @clients.each_value(&:close) @clients.clear @context.close @load_balancer&.close end |
#invoke(service, method, request = nil, metadata: {}, timeout: nil, encoding: nil, initial: nil, attempts: 3, &block) ⇒ Object
Invoke an RPC (called by Stub). Load balances per call.
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/async/grpc/xds/client.rb', line 152 def invoke(service, method, request = nil, metadata: {}, timeout: nil, encoding: nil, initial: nil, attempts: 3, &block) client, endpoint = client_for_call @load_balancer.record_request_start(endpoint) begin client.invoke(service, method, request, metadata: , timeout: timeout, encoding: encoding, initial: initial, &block) rescue Protocol::GRPC::Error => error if error.status_code == Protocol::GRPC::Status::UNAVAILABLE Console.warn(self, error) invalidate_cache! attempts -= 1 retry if attempts > 0 end raise rescue => error Console.warn(self, error) invalidate_endpoint(client) attempts -= 1 retry if attempts > 0 raise end ensure @load_balancer&.record_request_end(endpoint) end |
#resolve_endpoints ⇒ Object
Resolve endpoints lazily (like SentinelClient.resolve_address)
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/async/grpc/xds/client.rb', line 51 def resolve_endpoints @mutex.synchronize do unless @load_balancer # Discover cluster via CDS cluster = @context.discover_cluster(@service_name) # Discover endpoints via EDS endpoints = @context.discover_endpoints(cluster) raise NoEndpointsError, "No endpoints discovered for #{@service_name}" if endpoints.empty? # Create load balancer @load_balancer = LoadBalancer.new(cluster, endpoints) # Set load balancer reference in context for endpoint updates @context.load_balancer = @load_balancer end @load_balancer.healthy_endpoints end rescue Context::ReloadError => error raise NoEndpointsError, "No endpoints discovered for #{@service_name}", cause: error end |
#stub(interface_class, service_name) ⇒ Object
Create a stub for the given interface. Same API as Async::GRPC::Client - load balancing happens per RPC call.
137 138 139 140 |
# File 'lib/async/grpc/xds/client.rb', line 137 def stub(interface_class, service_name) interface = interface_class.new(service_name) Stub.new(self, interface) end |