Class: Async::GRPC::XDS::DiscoveryClient
- Inherits:
-
Object
- Object
- Async::GRPC::XDS::DiscoveryClient
- Includes:
- ADSStream::Delegate
- Defined in:
- lib/async/grpc/xds/discovery_client.rb
Overview
Client for xDS APIs (ADS or individual APIs) Implements Aggregated Discovery Service (ADS) protocol Acts as delegate for ADSStream, receiving discovery_response events
Constant Summary collapse
- LISTENER_TYPE =
xDS API type URLs (v3 API)
"type.googleapis.com/envoy.config.listener.v3.Listener"- ROUTE_TYPE =
"type.googleapis.com/envoy.config.route.v3.RouteConfiguration"- CLUSTER_TYPE =
"type.googleapis.com/envoy.config.cluster.v3.Cluster"- ENDPOINT_TYPE =
"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"- SECRET_TYPE =
"type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.Secret"
Instance Method Summary collapse
-
#close ⇒ Object
Close xDS discovery client.
- #discovery_response(response, stream) ⇒ Object
-
#initialize(server_config, node: nil) ⇒ DiscoveryClient
constructor
Initialize xDS discovery client.
- #stream_closed(stream) ⇒ Object
- #stream_opened(stream) ⇒ Object
-
#subscribe(type_url, resource_names, &block) ⇒ Object
Subscribe to resource type using ADS (Aggregated Discovery Service - single stream for all types).
Constructor Details
#initialize(server_config, node: nil) ⇒ DiscoveryClient
Initialize xDS discovery client
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/async/grpc/xds/discovery_client.rb', line 37 def initialize(server_config, node: nil) @server_uri = server_config[:server_uri] @channel_creds = server_config[:channel_creds] @server_features = server_config[:server_features] || [] @node_info = node || build_node_info @node = build_node_proto(@node_info) @grpc_client = nil @versions = {} # Track version_info per type_url @nonces = {} # Track nonces per type_url @mutex = Mutex.new @subscriptions = {} # Track subscriptions by type_url @stream_task = nil @ads_stream = nil # ADSStream instance when connected (owns stream state) @stream_ready_promise = nil # Resolved when stream_opened runs end |
Instance Method Details
#close ⇒ Object
Close xDS discovery client
88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/async/grpc/xds/discovery_client.rb', line 88 def close @mutex.synchronize do @stream_task&.stop @grpc_client&.close @grpc_client = nil @subscriptions.clear @stream_task = nil @ads_stream = nil @stream_ready_promise = nil end end |
#discovery_response(response, stream) ⇒ Object
164 165 166 |
# File 'lib/async/grpc/xds/discovery_client.rb', line 164 def discovery_response(response, stream) process_response(response, stream) end |
#stream_closed(stream) ⇒ Object
160 161 162 |
# File 'lib/async/grpc/xds/discovery_client.rb', line 160 def stream_closed(stream) @mutex.synchronize{@ads_stream = nil} end |
#stream_opened(stream) ⇒ Object
155 156 157 158 |
# File 'lib/async/grpc/xds/discovery_client.rb', line 155 def stream_opened(stream) @mutex.synchronize{@ads_stream = stream} @stream_ready_promise&.resolve(stream) end |
#subscribe(type_url, resource_names, &block) ⇒ Object
Subscribe to resource type using ADS (Aggregated Discovery Service - single stream for all types)
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/async/grpc/xds/discovery_client.rb', line 59 def subscribe(type_url, resource_names, &block) # Store subscription callback @mutex.synchronize do @subscriptions[type_url] = { resource_names: resource_names, callback: block } end # Ensure ADS stream is running ensure_stream_running # Wait for stream to be ready (event-driven, no polling) promise = @stream_ready_promise if promise && !promise.completed? begin promise.wait(timeout: 5) rescue Async::TimeoutError # Stream didn't open in time; send_discovery_request will no-op if @ads_stream is nil end end send_discovery_request(type_url, resource_names) if @ads_stream # Return the stream task (already running) @stream_task end |