Class: Async::GRPC::XDS::Service

Inherits:
Service
  • Object
show all
Defined in:
lib/async/grpc/xds/service.rb

Overview

Serves Envoy Aggregated Discovery Service requests from a ControlPlane.

Defined Under Namespace

Classes: Stream

Constant Summary collapse

SERVICE_NAME =
"envoy.service.discovery.v3.AggregatedDiscoveryService"

Instance Method Summary collapse

Constructor Details

#initialize(control_plane) ⇒ Service

Returns a new instance of Service.



24
25
26
27
# File 'lib/async/grpc/xds/service.rb', line 24

def initialize(control_plane)
	super(Envoy::Service::Discovery::V3::AggregatedDiscoveryService, SERVICE_NAME)
	@control_plane = control_plane
end

Instance Method Details

#delta_aggregated_resources(input, output, call) ⇒ Object

Raises:

  • (Protocol::GRPC::Error)


51
52
53
54
55
56
# File 'lib/async/grpc/xds/service.rb', line 51

def delta_aggregated_resources(input, output, call)
	raise Protocol::GRPC::Error.new(
		Protocol::GRPC::Status::UNIMPLEMENTED,
		"Delta xDS is not implemented."
	)
end

#stream_aggregated_resources(input, output, call) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/async/grpc/xds/service.rb', line 29

def stream_aggregated_resources(input, output, call)
	stream = Stream.new(@control_plane, output)
	@control_plane.register_stream(stream)
	
	reader = Async do
		input.each do |request|
			stream.request(request)
		end
	end
	
	writer = Async do
		stream.run
	end
	
	reader.wait
ensure
	stream&.close
	reader&.stop
	writer&.stop
	@control_plane.remove_stream(stream) if stream
end