Class: Async::GRPC::XDS::ADSStream

Inherits:
Object
  • Object
show all
Defined in:
lib/async/grpc/xds/ads_stream.rb

Overview

Encapsulates a single ADS (Aggregated Discovery Service) bidirectional stream. Owns the stream lifecycle and delegates events to a delegate object.

Defined Under Namespace

Modules: Delegate

Instance Method Summary collapse

Constructor Details

#initialize(client, node, delegate:) ⇒ ADSStream

Returns a new instance of ADSStream.



27
28
29
30
31
32
# File 'lib/async/grpc/xds/ads_stream.rb', line 27

def initialize(client, node, delegate:)
	@client = client
	@node = node
	@delegate = delegate
	@body = nil
end

Instance Method Details

#run(initial: nil) ⇒ Object

Run the ADS stream. Blocks until the stream completes or errors.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/async/grpc/xds/ads_stream.rb', line 42

def run(initial: nil)
	service = Envoy::Service::Discovery::V3::AggregatedDiscoveryService.new(
		"envoy.service.discovery.v3.AggregatedDiscoveryService"
	)
	
	initial = Array(initial).any? ? initial : [Envoy::Service::Discovery::V3::DiscoveryRequest.new(node: @node)]
	
	@client.invoke(service, :StreamAggregatedResources, nil, initial: initial) do |body, readable_body|
		@body = body
		@delegate.stream_opened(self) if @delegate.respond_to?(:stream_opened)
		
		begin
			readable_body.each do |response|
				@delegate.discovery_response(response, self)
			end
		ensure
			@delegate.stream_closed(self) if @delegate.respond_to?(:stream_closed)
			@body = nil
		end
	end
rescue => error
	@delegate.stream_error(self, error) if @delegate.respond_to?(:stream_error)
	Console.error(self, "Failed while streaming updates!", exception: error)
	raise
end

#send(request) ⇒ Object

Send a DiscoveryRequest on the stream. Call from within discovery_response to send ACKs.



36
37
38
# File 'lib/async/grpc/xds/ads_stream.rb', line 36

def send(request)
	@body&.write(request)
end