Class: Async::GRPC::XDS::Context
- Inherits:
-
Object
- Object
- Async::GRPC::XDS::Context
- Defined in:
- lib/async/grpc/xds/context.rb
Overview
Manages xDS subscriptions and maintains discovered resource state
Defined Under Namespace
Classes: ConfigurationError, ReloadError
Instance Method Summary collapse
-
#close ⇒ Object
Close all subscriptions.
-
#discover_cluster(service_name) ⇒ Object
Discover cluster for service (like ClusterClient.reload_cluster!).
-
#discover_endpoints(cluster) ⇒ Object
Discover endpoints for cluster (like ClusterClient discovers nodes).
-
#initialize(bootstrap, node: nil) ⇒ Context
constructor
Initialize xDS context.
-
#load_balancer=(load_balancer) ⇒ Object
Set load balancer reference (called by Client).
-
#subscribe_cds(service_name) ⇒ Object
Subscribe to CDS (Cluster Discovery Service).
-
#subscribe_eds(cluster_name) ⇒ Object
Subscribe to EDS (Endpoint Discovery Service).
Constructor Details
#initialize(bootstrap, node: nil) ⇒ Context
Initialize xDS context
27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/async/grpc/xds/context.rb', line 27 def initialize(bootstrap, node: nil) @bootstrap = bootstrap xds_server = bootstrap[:xds_servers]&.first raise ConfigurationError, "No xds_servers in bootstrap" unless xds_server @discovery_client = DiscoveryClient.new(xds_server, node: node) @cache = ResourceCache.new @subscriptions = {} # Track active subscriptions @load_balancer = nil # Will be set by Client @mutex = Mutex.new @cluster_promises = {} # service_name -> Async::Promise (level-triggered: resolved value persists) @endpoint_promises = {} # cluster_name -> Async::Promise end |
Instance Method Details
#close ⇒ Object
Close all subscriptions
139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/async/grpc/xds/context.rb', line 139 def close @mutex.synchronize do @subscriptions.each_value do |task| task.stop if task.respond_to?(:stop) end @subscriptions.clear @cluster_promises.clear @endpoint_promises.clear end @discovery_client.close end |
#discover_cluster(service_name) ⇒ Object
Discover cluster for service (like ClusterClient.reload_cluster!)
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/async/grpc/xds/context.rb', line 50 def discover_cluster(service_name) @mutex.synchronize do # Check cache first if cluster = @cache.get_cluster(service_name) return cluster end # Subscribe to CDS if not already subscribed unless @subscriptions[:cds] @subscriptions[:cds] = subscribe_cds(service_name) end # Subscribe to EDS for same name up front (EDS clusters use service name as cluster name) # This avoids 10s delay between CDS and EDS - both requests go out together subscription_key = :"eds_#{service_name}" unless @subscriptions[subscription_key] @subscriptions[subscription_key] = subscribe_eds(service_name) end end return @cache.get_cluster(service_name) if @cache.get_cluster(service_name) # Wait for cluster (CDS response) cluster = wait_for_cluster(service_name, timeout: 10) raise ReloadError, "Failed to discover cluster: #{service_name}" unless cluster cluster end |
#discover_endpoints(cluster) ⇒ Object
Discover endpoints for cluster (like ClusterClient discovers nodes)
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/async/grpc/xds/context.rb', line 80 def discover_endpoints(cluster) cluster_name = cluster.name @mutex.synchronize do # Check cache first if endpoints = @cache.get_endpoints(cluster_name) return endpoints end # Subscribe to EDS if not already subscribed subscription_key = :"eds_#{cluster_name}" unless @subscriptions[subscription_key] @subscriptions[subscription_key] = subscribe_eds(cluster_name) end end return @cache.get_endpoints(cluster_name) if @cache.get_endpoints(cluster_name) # Wait outside mutex so EDS callback can run and update cache endpoints = wait_for_endpoints(cluster_name, timeout: 10) raise ReloadError, "Failed to discover endpoints for cluster: #{cluster_name}" unless endpoints endpoints end |
#load_balancer=(load_balancer) ⇒ Object
Set load balancer reference (called by Client)
43 44 45 |
# File 'lib/async/grpc/xds/context.rb', line 43 def load_balancer=(load_balancer) @load_balancer = load_balancer end |
#subscribe_cds(service_name) ⇒ Object
Subscribe to CDS (Cluster Discovery Service)
105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/async/grpc/xds/context.rb', line 105 def subscribe_cds(service_name) @discovery_client.subscribe( DiscoveryClient::CLUSTER_TYPE, [service_name] ) do |resources| resources.each do |resource| cluster = resource.is_a?(Resources::Cluster) ? resource : Resources::Cluster.from_proto(resource) @cache.update_cluster(cluster) resolve_cluster_promise(cluster.name, cluster) end end end |
#subscribe_eds(cluster_name) ⇒ Object
Subscribe to EDS (Endpoint Discovery Service)
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/async/grpc/xds/context.rb', line 121 def subscribe_eds(cluster_name) @discovery_client.subscribe( DiscoveryClient::ENDPOINT_TYPE, [cluster_name] ) do |resources| resources.each do |resource| assignment = resource.is_a?(Resources::ClusterLoadAssignment) ? resource : Resources::ClusterLoadAssignment.from_proto(resource) endpoints = assignment.endpoints.select(&:healthy?).map do |endpoint| Async::HTTP::Endpoint.parse(endpoint.uri, protocol: Async::HTTP::Protocol::HTTP2) end @cache.update_endpoints(cluster_name, endpoints) resolve_endpoint_promise(cluster_name, endpoints) unless endpoints.empty? @load_balancer&.update_endpoints(endpoints) end end end |