Module: Async::GRPC::XDS::ResourceBuilder

Defined in:
lib/async/grpc/xds/resource_builder.rb

Overview

Builds Envoy xDS resource protobufs.

Constant Summary collapse

TYPE_URL_PREFIX =
"type.googleapis.com"
CLUSTER_TYPE =
"#{TYPE_URL_PREFIX}/envoy.config.cluster.v3.Cluster"
ENDPOINT_TYPE =
"#{TYPE_URL_PREFIX}/envoy.config.endpoint.v3.ClusterLoadAssignment"

Class Method Summary collapse

Class Method Details

.cluster(name, service_name: name, load_balancer_policy: :round_robin, connect_timeout: 5) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/async/grpc/xds/resource_builder.rb', line 34

def self.cluster(name, service_name: name, load_balancer_policy: :round_robin, connect_timeout: 5)
	Envoy::Config::Cluster::V3::Cluster.new(
		name: name.to_s,
		type: Envoy::Config::Cluster::V3::Cluster::DiscoveryType::EDS,
		eds_cluster_config: Envoy::Config::Cluster::V3::Cluster::EdsClusterConfig.new(
			service_name: service_name.to_s,
			eds_config: Envoy::Config::Core::V3::ConfigSource.new(
				ads: Envoy::Config::Core::V3::AggregatedConfigSource.new
			)
		),
		connect_timeout: duration(connect_timeout),
		lb_policy: load_balancer_policy_value(load_balancer_policy),
		http2_protocol_options: Envoy::Config::Core::V3::Http2ProtocolOptions.new
	)
end

.cluster_load_assignment(cluster_name, endpoints) ⇒ Object



50
51
52
53
54
55
56
57
58
59
# File 'lib/async/grpc/xds/resource_builder.rb', line 50

def self.cluster_load_assignment(cluster_name, endpoints)
	Envoy::Config::Endpoint::V3::ClusterLoadAssignment.new(
		cluster_name: cluster_name.to_s,
		endpoints: [
			Envoy::Config::Endpoint::V3::LocalityLbEndpoints.new(
				lb_endpoints: endpoints.map{|endpoint| load_balancer_endpoint(endpoint)}
			)
		]
	)
end

.duration(seconds) ⇒ Object



102
103
104
105
106
107
# File 'lib/async/grpc/xds/resource_builder.rb', line 102

def self.duration(seconds)
	whole_seconds = seconds.to_i
	nanos = ((seconds.to_f - whole_seconds) * 1_000_000_000).to_i
		
	Google::Protobuf::Duration.new(seconds: whole_seconds, nanos: nanos)
end

.health_status_value(healthy) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/async/grpc/xds/resource_builder.rb', line 122

def self.health_status_value(healthy)
	case healthy
	when :healthy, :HEALTHY, "healthy", "HEALTHY", true
		Envoy::Config::Core::V3::HealthStatus::HEALTHY
	when :unhealthy, :UNHEALTHY, "unhealthy", "UNHEALTHY", false
		Envoy::Config::Core::V3::HealthStatus::UNHEALTHY
	when :degraded, :DEGRADED, "degraded", "DEGRADED"
		Envoy::Config::Core::V3::HealthStatus::DEGRADED
	else
		Envoy::Config::Core::V3::HealthStatus::UNKNOWN
	end
end

.load_balancer_endpoint(endpoint) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/async/grpc/xds/resource_builder.rb', line 61

def self.load_balancer_endpoint(endpoint)
	endpoint = normalize_endpoint(endpoint)
		
	Envoy::Config::Endpoint::V3::LbEndpoint.new(
		endpoint: Envoy::Config::Endpoint::V3::Endpoint.new(
			address: Envoy::Config::Core::V3::Address.new(
				socket_address: Envoy::Config::Core::V3::SocketAddress.new(
					protocol: Envoy::Config::Core::V3::SocketAddress::Protocol::TCP,
					address: endpoint[:address],
					port_value: endpoint[:port]
				)
			),
			hostname: endpoint[:hostname].to_s
		),
		health_status: health_status_value(endpoint.fetch(:healthy, true))
	)
end

.load_balancer_policy_value(load_balancer_policy) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/async/grpc/xds/resource_builder.rb', line 109

def self.load_balancer_policy_value(load_balancer_policy)
	case load_balancer_policy
	when :round_robin, :ROUND_ROBIN, "round_robin", "ROUND_ROBIN"
		Envoy::Config::Cluster::V3::Cluster::LbPolicy::ROUND_ROBIN
	when :least_request, :LEAST_REQUEST, "least_request", "LEAST_REQUEST"
		Envoy::Config::Cluster::V3::Cluster::LbPolicy::LEAST_REQUEST
	when :random, :RANDOM, "random", "RANDOM"
		Envoy::Config::Cluster::V3::Cluster::LbPolicy::RANDOM
	else
		load_balancer_policy
	end
end

.normalize_endpoint(endpoint) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/async/grpc/xds/resource_builder.rb', line 79

def self.normalize_endpoint(endpoint)
	case endpoint
	when Hash
		{
			address: endpoint.fetch(:address){endpoint.fetch("address")},
			port: endpoint.fetch(:port){endpoint.fetch("port")}.to_i,
			hostname: endpoint[:hostname] || endpoint["hostname"],
			healthy: endpoint.key?(:healthy) ? endpoint[:healthy] : endpoint.fetch("healthy", true)
		}
	else
		if endpoint.respond_to?(:address) && endpoint.respond_to?(:port)
			{
				address: endpoint.address,
				port: endpoint.port.to_i,
				hostname: endpoint.respond_to?(:hostname) ? endpoint.hostname : nil,
				healthy: endpoint.respond_to?(:healthy?) ? endpoint.healthy? : true
			}
		else
			raise ArgumentError, "Invalid endpoint: #{endpoint.inspect}"
		end
	end
end

.pack(resource) ⇒ Object



27
28
29
30
31
32
# File 'lib/async/grpc/xds/resource_builder.rb', line 27

def self.pack(resource)
	Google::Protobuf::Any.new(
		type_url: "#{TYPE_URL_PREFIX}/#{resource.class.descriptor.name}",
		value: resource.to_proto
	)
end