Class: Async::GRPC::XDS::Service::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/async/grpc/xds/service.rb

Overview

Represents one ADS stream and its subscribed resources.

Instance Method Summary collapse

Constructor Details

#initialize(control_plane, output) ⇒ Stream

Returns a new instance of Stream.



60
61
62
63
64
65
66
67
# File 'lib/async/grpc/xds/service.rb', line 60

def initialize(control_plane, output)
	@control_plane = control_plane
	@output = output
	@subscriptions = Hash.new{|hash, type_url| hash[type_url] = Set.new}
	@versions = {}
	@queue = Async::Queue.new
	@closed = false
end

Instance Method Details

#changed(type_url) ⇒ Object



86
87
88
# File 'lib/async/grpc/xds/service.rb', line 86

def changed(type_url)
	@queue << type_url unless @closed
end

#closeObject



109
110
111
112
# File 'lib/async/grpc/xds/service.rb', line 109

def close
	@closed = true
	@queue.close
end

#flush(type_url) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
# File 'lib/async/grpc/xds/service.rb', line 97

def flush(type_url)
	names = @subscriptions[type_url]
	return unless names
	
	version = @control_plane.version(type_url)
	return if @versions[type_url] == version
	
	response = @control_plane.response(type_url, names)
	@output.write(response)
	@versions[type_url] = version
end

#request(request) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/async/grpc/xds/service.rb', line 69

def request(request)
	return if request.type_url.nil? || request.type_url.empty?
	
	if request.error_detail
		Console.warn(self, "Received xDS NACK.", type_url: request.type_url, error_detail: request.error_detail)
		return
	end
	
	if request.resource_names.any?
		@subscriptions[request.type_url].merge(request.resource_names)
	else
		@subscriptions[request.type_url]
	end
	
	@queue << request.type_url
end

#runObject



90
91
92
93
94
95
# File 'lib/async/grpc/xds/service.rb', line 90

def run
	until @closed
		type_url = @queue.dequeue
		flush(type_url)
	end
end