Class: Async::GRPC::XDS::LoadBalancer

Inherits:
Object
  • Object
show all
Defined in:
lib/async/grpc/xds/load_balancer.rb

Overview

Client-side load balancing with health checking. RING_HASH and MAGLEV fall back to round-robin (require request context to hash).

Constant Summary collapse

ROUND_ROBIN =

Load balancing policies.

:round_robin
LEAST_REQUEST =
:least_request
RANDOM =
:random
RING_HASH =
:ring_hash
MAGLEV =
:maglev

Instance Method Summary collapse

Constructor Details

#initialize(cluster, endpoints) ⇒ LoadBalancer

Initialize load balancer



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/async/grpc/xds/load_balancer.rb', line 27

def initialize(cluster, endpoints)
	@cluster = cluster
	@endpoints = endpoints
	@policy = parse_policy(cluster.load_balancer_policy)
	@health_status = {}  # Track health per endpoint
	@health_checker = HealthChecker.new(cluster.health_checks)
	@current_index = 0
	@in_flight_requests = {}  # Track in-flight requests per endpoint
	@health_check_task = nil  # Transient task for health check loop
	
	# Initialize health status
	@endpoints.each do |endpoint|
		@health_status[endpoint] = :unknown
	end
	
	# Start health checking if configured
	start_health_checks if cluster.health_checks.any?
end

Instance Method Details

#closeObject

Close load balancer



122
123
124
125
126
127
128
129
# File 'lib/async/grpc/xds/load_balancer.rb', line 122

def close
	if health_check_task = @health_check_task
		@health_check_task = nil
		health_check_task.stop
	end
	
	@health_checker.close
end

#healthy_endpointsObject

Get healthy endpoints



48
49
50
# File 'lib/async/grpc/xds/load_balancer.rb', line 48

def healthy_endpoints
	@endpoints.select{|endpoint| healthy?(endpoint)}
end

#mark_unhealthy(endpoint) ⇒ Object

Mark endpoint as unhealthy (e.g. after connection failure). Health checker may restore it on next successful check.



117
118
119
# File 'lib/async/grpc/xds/load_balancer.rb', line 117

def mark_unhealthy(endpoint)
	@health_status[endpoint] = :unhealthy
end

#pickObject

Pick next endpoint using load balancing policy



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/async/grpc/xds/load_balancer.rb', line 54

def pick
	healthy = healthy_endpoints
	return nil if healthy.empty?
	
	case @policy
	when ROUND_ROBIN
		pick_round_robin(healthy)
	when LEAST_REQUEST
		pick_least_request(healthy)
	when RANDOM
		pick_random(healthy)
	when RING_HASH
		pick_ring_hash(healthy)
	when MAGLEV
		pick_maglev(healthy)
	else
		healthy.first
	end
end

#record_request_end(endpoint) ⇒ Object

Record that a request has finished for the given endpoint. Must be called in ensure to decrement even on error/retry.



106
107
108
109
110
111
112
# File 'lib/async/grpc/xds/load_balancer.rb', line 106

def record_request_end(endpoint)
	return unless endpoint
	current = @in_flight_requests[endpoint]
	return unless current && current > 0
	@in_flight_requests[endpoint] = current - 1
	@in_flight_requests.delete(endpoint) if @in_flight_requests[endpoint] == 0
end

#record_request_start(endpoint) ⇒ Object

Record that a request has started for the given endpoint. Used by LEAST_REQUEST policy. Call from Client when a call begins.



98
99
100
101
# File 'lib/async/grpc/xds/load_balancer.rb', line 98

def record_request_start(endpoint)
	@in_flight_requests[endpoint] ||= 0
	@in_flight_requests[endpoint] += 1
end

#update_endpoints(endpoints) ⇒ Object

Update endpoints from EDS



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/async/grpc/xds/load_balancer.rb', line 76

def update_endpoints(endpoints)
	old_endpoints = @endpoints
	@endpoints = endpoints
	
	# Update health checker
	@health_checker.update_endpoints(endpoints)
	
	# Initialize health status for new endpoints
	endpoints.each do |endpoint|
		@health_status[endpoint] ||= :unknown
	end
	
	# Remove state for old endpoints
	(old_endpoints - endpoints).each do |endpoint|
		@health_status.delete(endpoint)
		@in_flight_requests.delete(endpoint)
	end
end