Class: Async::GRPC::XDS::ControlPlane

Inherits:
Object
  • Object
show all
Defined in:
lib/async/grpc/xds/control_plane.rb

Overview

Maintains xDS resource snapshots and notifies ADS streams when resources change.

Constant Summary collapse

CLUSTER_TYPE =
ResourceBuilder::CLUSTER_TYPE
ENDPOINT_TYPE =
ResourceBuilder::ENDPOINT_TYPE

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(identifier: "async-grpc-xds") ⇒ ControlPlane

Returns a new instance of ControlPlane.



24
25
26
27
28
29
30
# File 'lib/async/grpc/xds/control_plane.rb', line 24

def initialize(identifier: "async-grpc-xds")
	@identifier = identifier
	@resources = Hash.new{|hash, type_url| hash[type_url] = {}}
	@versions = Hash.new(0)
	@streams = Set.new.compare_by_identity
	@mutex = Mutex.new
end

Instance Attribute Details

#identifierObject (readonly)

Returns the value of attribute identifier.



32
33
34
# File 'lib/async/grpc/xds/control_plane.rb', line 32

def identifier
  @identifier
end

Instance Method Details

#register_stream(stream) ⇒ Object



117
118
119
120
121
# File 'lib/async/grpc/xds/control_plane.rb', line 117

def register_stream(stream)
	@mutex.synchronize do
		@streams.add(stream)
	end
end

#remove_cluster(name) ⇒ Object



47
48
49
# File 'lib/async/grpc/xds/control_plane.rb', line 47

def remove_cluster(name)
	remove_resource(CLUSTER_TYPE, name.to_s)
end

#remove_endpoints(cluster_name) ⇒ Object



51
52
53
# File 'lib/async/grpc/xds/control_plane.rb', line 51

def remove_endpoints(cluster_name)
	remove_resource(ENDPOINT_TYPE, cluster_name.to_s)
end

#remove_resource(type_url, name) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/async/grpc/xds/control_plane.rb', line 67

def remove_resource(type_url, name)
	notify = false
	
	@mutex.synchronize do
		if @resources[type_url].delete(name)
			@versions[type_url] += 1
			notify = true
		end
	end
	
	notify_streams(type_url) if notify
end

#remove_stream(stream) ⇒ Object



123
124
125
126
127
# File 'lib/async/grpc/xds/control_plane.rb', line 123

def remove_stream(stream)
	@mutex.synchronize do
		@streams.delete(stream)
	end
end

#resource_names(type_url) ⇒ Object



80
81
82
83
84
# File 'lib/async/grpc/xds/control_plane.rb', line 80

def resource_names(type_url)
	@mutex.synchronize do
		@resources[type_url].keys
	end
end

#resources(type_url, names = nil) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/async/grpc/xds/control_plane.rb', line 86

def resources(type_url, names = nil)
	@mutex.synchronize do
		resources = @resources[type_url]
		
		if names && names.any?
			names.filter_map{|name| resources[name]}
		else
			resources.values
		end
	end
end

#response(type_url, names = nil) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/async/grpc/xds/control_plane.rb', line 104

def response(type_url, names = nil)
	resources = self.resources(type_url, names)
	version = self.version(type_url)
	
	Envoy::Service::Discovery::V3::DiscoveryResponse.new(
		version_info: version,
		resources: resources.map{|resource| ResourceBuilder.pack(resource)},
		type_url: type_url,
		nonce: "#{type_url}:#{version}:#{SecureRandom.hex(8)}",
		control_plane: Envoy::Config::Core::V3::ControlPlane.new(identifier: @identifier)
	)
end

#update_cluster(name, resource = nil, **options) ⇒ Object



34
35
36
37
# File 'lib/async/grpc/xds/control_plane.rb', line 34

def update_cluster(name, resource = nil, **options)
	resource ||= ResourceBuilder.cluster(name, **options)
	update_resource(CLUSTER_TYPE, name.to_s, resource)
end

#update_endpoints(cluster_name, endpoints) ⇒ Object



39
40
41
42
43
44
45
# File 'lib/async/grpc/xds/control_plane.rb', line 39

def update_endpoints(cluster_name, endpoints)
	update_resource(
		ENDPOINT_TYPE,
		cluster_name.to_s,
		ResourceBuilder.cluster_load_assignment(cluster_name, endpoints)
	)
end

#update_resource(type_url, name, resource) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
# File 'lib/async/grpc/xds/control_plane.rb', line 55

def update_resource(type_url, name, resource)
	notify = false
	
	@mutex.synchronize do
		@resources[type_url][name] = resource
		@versions[type_url] += 1
		notify = true
	end
	
	notify_streams(type_url) if notify
end

#version(type_url) ⇒ Object



98
99
100
101
102
# File 'lib/async/grpc/xds/control_plane.rb', line 98

def version(type_url)
	@mutex.synchronize do
		@versions[type_url].to_s
	end
end