Class: Async::GRPC::XDS::Service::Stream
- Inherits:
-
Object
- Object
- Async::GRPC::XDS::Service::Stream
- Defined in:
- lib/async/grpc/xds/service.rb
Overview
Represents one ADS stream and its subscribed resources.
Instance Method Summary collapse
- #changed(type_url) ⇒ Object
- #close ⇒ Object
- #flush(type_url) ⇒ Object
-
#initialize(control_plane, output) ⇒ Stream
constructor
A new instance of Stream.
- #request(request) ⇒ Object
- #run ⇒ Object
Constructor Details
#initialize(control_plane, output) ⇒ Stream
Returns a new instance of Stream.
60 61 62 63 64 65 66 67 |
# File 'lib/async/grpc/xds/service.rb', line 60 def initialize(control_plane, output) @control_plane = control_plane @output = output @subscriptions = Hash.new{|hash, type_url| hash[type_url] = Set.new} @versions = {} @queue = Async::Queue.new @closed = false end |
Instance Method Details
#changed(type_url) ⇒ Object
86 87 88 |
# File 'lib/async/grpc/xds/service.rb', line 86 def changed(type_url) @queue << type_url unless @closed end |
#close ⇒ Object
109 110 111 112 |
# File 'lib/async/grpc/xds/service.rb', line 109 def close @closed = true @queue.close end |
#flush(type_url) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/async/grpc/xds/service.rb', line 97 def flush(type_url) names = @subscriptions[type_url] return unless names version = @control_plane.version(type_url) return if @versions[type_url] == version response = @control_plane.response(type_url, names) @output.write(response) @versions[type_url] = version end |
#request(request) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/async/grpc/xds/service.rb', line 69 def request(request) return if request.type_url.nil? || request.type_url.empty? if request.error_detail Console.warn(self, "Received xDS NACK.", type_url: request.type_url, error_detail: request.error_detail) return end if request.resource_names.any? @subscriptions[request.type_url].merge(request.resource_names) else @subscriptions[request.type_url] end @queue << request.type_url end |
#run ⇒ Object
90 91 92 93 94 95 |
# File 'lib/async/grpc/xds/service.rb', line 90 def run until @closed type_url = @queue.dequeue flush(type_url) end end |