Class: Async::GRPC::XDS::Context

Inherits:
Object
  • Object
show all
Defined in:
lib/async/grpc/xds/context.rb

Overview

Manages xDS subscriptions and maintains discovered resource state

Defined Under Namespace

Classes: ConfigurationError, ReloadError

Instance Method Summary collapse

Constructor Details

#initialize(bootstrap, node: nil) ⇒ Context

Initialize xDS context

Raises:



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/async/grpc/xds/context.rb', line 27

def initialize(bootstrap, node: nil)
	@bootstrap = bootstrap
	xds_server = bootstrap[:xds_servers]&.first
	raise ConfigurationError, "No xds_servers in bootstrap" unless xds_server
	
	@discovery_client = DiscoveryClient.new(xds_server, node: node)
	@cache = ResourceCache.new
	@subscriptions = {}  # Track active subscriptions
	@load_balancer = nil  # Will be set by Client
	@mutex = Mutex.new
	@cluster_promises = {}  # service_name -> Async::Promise (level-triggered: resolved value persists)
	@endpoint_promises = {}  # cluster_name -> Async::Promise
end

Instance Method Details

#closeObject

Close all subscriptions



139
140
141
142
143
144
145
146
147
148
149
# File 'lib/async/grpc/xds/context.rb', line 139

def close
	@mutex.synchronize do
		@subscriptions.each_value do |task|
			task.stop if task.respond_to?(:stop)
		end
		@subscriptions.clear
		@cluster_promises.clear
		@endpoint_promises.clear
	end
	@discovery_client.close
end

#discover_cluster(service_name) ⇒ Object

Discover cluster for service (like ClusterClient.reload_cluster!)

Raises:



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/async/grpc/xds/context.rb', line 50

def discover_cluster(service_name)
	@mutex.synchronize do
		# Check cache first
		if cluster = @cache.get_cluster(service_name)
			return cluster
		end
		
		# Subscribe to CDS if not already subscribed
		unless @subscriptions[:cds]
			@subscriptions[:cds] = subscribe_cds(service_name)
		end
		
		# Subscribe to EDS for same name up front (EDS clusters use service name as cluster name)
		# This avoids 10s delay between CDS and EDS - both requests go out together
		subscription_key = :"eds_#{service_name}"
		unless @subscriptions[subscription_key]
			@subscriptions[subscription_key] = subscribe_eds(service_name)
		end
	end
	return @cache.get_cluster(service_name) if @cache.get_cluster(service_name)
	
	# Wait for cluster (CDS response)
	cluster = wait_for_cluster(service_name, timeout: 10)
	raise ReloadError, "Failed to discover cluster: #{service_name}" unless cluster
	cluster
end

#discover_endpoints(cluster) ⇒ Object

Discover endpoints for cluster (like ClusterClient discovers nodes)

Raises:



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/async/grpc/xds/context.rb', line 80

def discover_endpoints(cluster)
	cluster_name = cluster.name
	@mutex.synchronize do
		# Check cache first
		if endpoints = @cache.get_endpoints(cluster_name)
			return endpoints
		end
		
		# Subscribe to EDS if not already subscribed
		subscription_key = :"eds_#{cluster_name}"
		unless @subscriptions[subscription_key]
			@subscriptions[subscription_key] = subscribe_eds(cluster_name)
		end
	end
	return @cache.get_endpoints(cluster_name) if @cache.get_endpoints(cluster_name)
	
	# Wait outside mutex so EDS callback can run and update cache
	endpoints = wait_for_endpoints(cluster_name, timeout: 10)
	raise ReloadError, "Failed to discover endpoints for cluster: #{cluster_name}" unless endpoints
	endpoints
end

#load_balancer=(load_balancer) ⇒ Object

Set load balancer reference (called by Client)



43
44
45
# File 'lib/async/grpc/xds/context.rb', line 43

def load_balancer=(load_balancer)
	@load_balancer = load_balancer
end

#subscribe_cds(service_name) ⇒ Object

Subscribe to CDS (Cluster Discovery Service)



105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/async/grpc/xds/context.rb', line 105

def subscribe_cds(service_name)
	@discovery_client.subscribe(
		DiscoveryClient::CLUSTER_TYPE,
		[service_name]
	) do |resources|
		resources.each do |resource|
			cluster = resource.is_a?(Resources::Cluster) ? resource : Resources::Cluster.from_proto(resource)
			@cache.update_cluster(cluster)
			resolve_cluster_promise(cluster.name, cluster)
		end
	end
end

#subscribe_eds(cluster_name) ⇒ Object

Subscribe to EDS (Endpoint Discovery Service)



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/async/grpc/xds/context.rb', line 121

def subscribe_eds(cluster_name)
	@discovery_client.subscribe(
		DiscoveryClient::ENDPOINT_TYPE,
		[cluster_name]
	) do |resources|
		resources.each do |resource|
			assignment = resource.is_a?(Resources::ClusterLoadAssignment) ? resource : Resources::ClusterLoadAssignment.from_proto(resource)
			endpoints = assignment.endpoints.select(&:healthy?).map do |endpoint|
				Async::HTTP::Endpoint.parse(endpoint.uri, protocol: Async::HTTP::Protocol::HTTP2)
			end
			@cache.update_endpoints(cluster_name, endpoints)
			resolve_endpoint_promise(cluster_name, endpoints) unless endpoints.empty?
			@load_balancer&.update_endpoints(endpoints)
		end
	end
end