Class: Async::GRPC::XDS::ADSStream
- Inherits:
-
Object
- Object
- Async::GRPC::XDS::ADSStream
- Defined in:
- lib/async/grpc/xds/ads_stream.rb
Overview
Encapsulates a single ADS (Aggregated Discovery Service) bidirectional stream. Owns the stream lifecycle and delegates events to a delegate object.
Defined Under Namespace
Modules: Delegate
Instance Method Summary collapse
-
#initialize(client, node, delegate:) ⇒ ADSStream
constructor
A new instance of ADSStream.
-
#run(initial: nil) ⇒ Object
Run the ADS stream.
-
#send(request) ⇒ Object
Send a DiscoveryRequest on the stream.
Constructor Details
#initialize(client, node, delegate:) ⇒ ADSStream
Returns a new instance of ADSStream.
27 28 29 30 31 32 |
# File 'lib/async/grpc/xds/ads_stream.rb', line 27 def initialize(client, node, delegate:) @client = client @node = node @delegate = delegate @body = nil end |
Instance Method Details
#run(initial: nil) ⇒ Object
Run the ADS stream. Blocks until the stream completes or errors.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/async/grpc/xds/ads_stream.rb', line 42 def run(initial: nil) service = Envoy::Service::Discovery::V3::AggregatedDiscoveryService.new( "envoy.service.discovery.v3.AggregatedDiscoveryService" ) initial = Array(initial).any? ? initial : [Envoy::Service::Discovery::V3::DiscoveryRequest.new(node: @node)] @client.invoke(service, :StreamAggregatedResources, nil, initial: initial) do |body, readable_body| @body = body @delegate.stream_opened(self) if @delegate.respond_to?(:stream_opened) begin readable_body.each do |response| @delegate.discovery_response(response, self) end ensure @delegate.stream_closed(self) if @delegate.respond_to?(:stream_closed) @body = nil end end rescue => error @delegate.stream_error(self, error) if @delegate.respond_to?(:stream_error) Console.error(self, "Failed while streaming updates!", exception: error) raise end |
#send(request) ⇒ Object
Send a DiscoveryRequest on the stream. Call from within discovery_response to send ACKs.
36 37 38 |
# File 'lib/async/grpc/xds/ads_stream.rb', line 36 def send(request) @body&.write(request) end |