Class: Async::GRPC::XDS::ControlPlane
- Inherits:
-
Object
- Object
- Async::GRPC::XDS::ControlPlane
- Defined in:
- lib/async/grpc/xds/control_plane.rb
Overview
Maintains xDS resource snapshots and notifies ADS streams when resources change.
Constant Summary collapse
- CLUSTER_TYPE =
ResourceBuilder::CLUSTER_TYPE
- ENDPOINT_TYPE =
ResourceBuilder::ENDPOINT_TYPE
Instance Attribute Summary collapse
-
#identifier ⇒ Object
readonly
Returns the value of attribute identifier.
Instance Method Summary collapse
-
#initialize(identifier: "async-grpc-xds") ⇒ ControlPlane
constructor
A new instance of ControlPlane.
- #register_stream(stream) ⇒ Object
- #remove_cluster(name) ⇒ Object
- #remove_endpoints(cluster_name) ⇒ Object
- #remove_resource(type_url, name) ⇒ Object
- #remove_stream(stream) ⇒ Object
- #resource_names(type_url) ⇒ Object
- #resources(type_url, names = nil) ⇒ Object
- #response(type_url, names = nil) ⇒ Object
- #update_cluster(name, resource = nil, **options) ⇒ Object
- #update_endpoints(cluster_name, endpoints) ⇒ Object
- #update_resource(type_url, name, resource) ⇒ Object
- #version(type_url) ⇒ Object
Constructor Details
#initialize(identifier: "async-grpc-xds") ⇒ ControlPlane
Returns a new instance of ControlPlane.
24 25 26 27 28 29 30 |
# File 'lib/async/grpc/xds/control_plane.rb', line 24 def initialize(identifier: "async-grpc-xds") @identifier = identifier @resources = Hash.new{|hash, type_url| hash[type_url] = {}} @versions = Hash.new(0) @streams = Set.new.compare_by_identity @mutex = Mutex.new end |
Instance Attribute Details
#identifier ⇒ Object (readonly)
Returns the value of attribute identifier.
32 33 34 |
# File 'lib/async/grpc/xds/control_plane.rb', line 32 def identifier @identifier end |
Instance Method Details
#register_stream(stream) ⇒ Object
117 118 119 120 121 |
# File 'lib/async/grpc/xds/control_plane.rb', line 117 def register_stream(stream) @mutex.synchronize do @streams.add(stream) end end |
#remove_cluster(name) ⇒ Object
47 48 49 |
# File 'lib/async/grpc/xds/control_plane.rb', line 47 def remove_cluster(name) remove_resource(CLUSTER_TYPE, name.to_s) end |
#remove_endpoints(cluster_name) ⇒ Object
51 52 53 |
# File 'lib/async/grpc/xds/control_plane.rb', line 51 def remove_endpoints(cluster_name) remove_resource(ENDPOINT_TYPE, cluster_name.to_s) end |
#remove_resource(type_url, name) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/async/grpc/xds/control_plane.rb', line 67 def remove_resource(type_url, name) notify = false @mutex.synchronize do if @resources[type_url].delete(name) @versions[type_url] += 1 notify = true end end notify_streams(type_url) if notify end |
#remove_stream(stream) ⇒ Object
123 124 125 126 127 |
# File 'lib/async/grpc/xds/control_plane.rb', line 123 def remove_stream(stream) @mutex.synchronize do @streams.delete(stream) end end |
#resource_names(type_url) ⇒ Object
80 81 82 83 84 |
# File 'lib/async/grpc/xds/control_plane.rb', line 80 def resource_names(type_url) @mutex.synchronize do @resources[type_url].keys end end |
#resources(type_url, names = nil) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/async/grpc/xds/control_plane.rb', line 86 def resources(type_url, names = nil) @mutex.synchronize do resources = @resources[type_url] if names && names.any? names.filter_map{|name| resources[name]} else resources.values end end end |
#response(type_url, names = nil) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/async/grpc/xds/control_plane.rb', line 104 def response(type_url, names = nil) resources = self.resources(type_url, names) version = self.version(type_url) Envoy::Service::Discovery::V3::DiscoveryResponse.new( version_info: version, resources: resources.map{|resource| ResourceBuilder.pack(resource)}, type_url: type_url, nonce: "#{type_url}:#{version}:#{SecureRandom.hex(8)}", control_plane: Envoy::Config::Core::V3::ControlPlane.new(identifier: @identifier) ) end |
#update_cluster(name, resource = nil, **options) ⇒ Object
34 35 36 37 |
# File 'lib/async/grpc/xds/control_plane.rb', line 34 def update_cluster(name, resource = nil, **) resource ||= ResourceBuilder.cluster(name, **) update_resource(CLUSTER_TYPE, name.to_s, resource) end |
#update_endpoints(cluster_name, endpoints) ⇒ Object
39 40 41 42 43 44 45 |
# File 'lib/async/grpc/xds/control_plane.rb', line 39 def update_endpoints(cluster_name, endpoints) update_resource( ENDPOINT_TYPE, cluster_name.to_s, ResourceBuilder.cluster_load_assignment(cluster_name, endpoints) ) end |
#update_resource(type_url, name, resource) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/async/grpc/xds/control_plane.rb', line 55 def update_resource(type_url, name, resource) notify = false @mutex.synchronize do @resources[type_url][name] = resource @versions[type_url] += 1 notify = true end notify_streams(type_url) if notify end |
#version(type_url) ⇒ Object
98 99 100 101 102 |
# File 'lib/async/grpc/xds/control_plane.rb', line 98 def version(type_url) @mutex.synchronize do @versions[type_url].to_s end end |