Class: Async::GRPC::XDS::LoadBalancer
- Inherits:
-
Object
- Object
- Async::GRPC::XDS::LoadBalancer
- Defined in:
- lib/async/grpc/xds/load_balancer.rb
Overview
Client-side load balancing with health checking. RING_HASH and MAGLEV fall back to round-robin (require request context to hash).
Constant Summary collapse
- ROUND_ROBIN =
Load balancing policies.
:round_robin- LEAST_REQUEST =
:least_request- RANDOM =
:random- RING_HASH =
:ring_hash- MAGLEV =
:maglev
Instance Method Summary collapse
-
#close ⇒ Object
Close load balancer.
-
#healthy_endpoints ⇒ Object
Get healthy endpoints.
-
#initialize(cluster, endpoints) ⇒ LoadBalancer
constructor
Initialize load balancer.
-
#mark_unhealthy(endpoint) ⇒ Object
Mark endpoint as unhealthy (e.g. after connection failure).
-
#pick ⇒ Object
Pick next endpoint using load balancing policy.
-
#record_request_end(endpoint) ⇒ Object
Record that a request has finished for the given endpoint.
-
#record_request_start(endpoint) ⇒ Object
Record that a request has started for the given endpoint.
-
#update_endpoints(endpoints) ⇒ Object
Update endpoints from EDS.
Constructor Details
#initialize(cluster, endpoints) ⇒ LoadBalancer
Initialize load balancer
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/async/grpc/xds/load_balancer.rb', line 27 def initialize(cluster, endpoints) @cluster = cluster @endpoints = endpoints @policy = parse_policy(cluster.load_balancer_policy) @health_status = {} # Track health per endpoint @health_checker = HealthChecker.new(cluster.health_checks) @current_index = 0 @in_flight_requests = {} # Track in-flight requests per endpoint @health_check_task = nil # Transient task for health check loop # Initialize health status @endpoints.each do |endpoint| @health_status[endpoint] = :unknown end # Start health checking if configured start_health_checks if cluster.health_checks.any? end |
Instance Method Details
#close ⇒ Object
Close load balancer
122 123 124 125 126 127 128 129 |
# File 'lib/async/grpc/xds/load_balancer.rb', line 122 def close if health_check_task = @health_check_task @health_check_task = nil health_check_task.stop end @health_checker.close end |
#healthy_endpoints ⇒ Object
Get healthy endpoints
48 49 50 |
# File 'lib/async/grpc/xds/load_balancer.rb', line 48 def healthy_endpoints @endpoints.select{|endpoint| healthy?(endpoint)} end |
#mark_unhealthy(endpoint) ⇒ Object
Mark endpoint as unhealthy (e.g. after connection failure). Health checker may restore it on next successful check.
117 118 119 |
# File 'lib/async/grpc/xds/load_balancer.rb', line 117 def mark_unhealthy(endpoint) @health_status[endpoint] = :unhealthy end |
#pick ⇒ Object
Pick next endpoint using load balancing policy
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/async/grpc/xds/load_balancer.rb', line 54 def pick healthy = healthy_endpoints return nil if healthy.empty? case @policy when ROUND_ROBIN pick_round_robin(healthy) when LEAST_REQUEST pick_least_request(healthy) when RANDOM pick_random(healthy) when RING_HASH pick_ring_hash(healthy) when MAGLEV pick_maglev(healthy) else healthy.first end end |
#record_request_end(endpoint) ⇒ Object
Record that a request has finished for the given endpoint. Must be called in ensure to decrement even on error/retry.
106 107 108 109 110 111 112 |
# File 'lib/async/grpc/xds/load_balancer.rb', line 106 def record_request_end(endpoint) return unless endpoint current = @in_flight_requests[endpoint] return unless current && current > 0 @in_flight_requests[endpoint] = current - 1 @in_flight_requests.delete(endpoint) if @in_flight_requests[endpoint] == 0 end |
#record_request_start(endpoint) ⇒ Object
Record that a request has started for the given endpoint. Used by LEAST_REQUEST policy. Call from Client when a call begins.
98 99 100 101 |
# File 'lib/async/grpc/xds/load_balancer.rb', line 98 def record_request_start(endpoint) @in_flight_requests[endpoint] ||= 0 @in_flight_requests[endpoint] += 1 end |
#update_endpoints(endpoints) ⇒ Object
Update endpoints from EDS
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/async/grpc/xds/load_balancer.rb', line 76 def update_endpoints(endpoints) old_endpoints = @endpoints @endpoints = endpoints # Update health checker @health_checker.update_endpoints(endpoints) # Initialize health status for new endpoints endpoints.each do |endpoint| @health_status[endpoint] ||= :unknown end # Remove state for old endpoints (old_endpoints - endpoints).each do |endpoint| @health_status.delete(endpoint) @in_flight_requests.delete(endpoint) end end |