GRPC advanced load balancing
GRPC has won as an inter-service API protocol, and most of the time, scaling a GRPC service is simply a matter of running several instances behind a load balancer with default settings. In this post, we’ll review using a couple of more advanced balancing strategies - including one that considers service load. We’ll also touch on backpressure strategies.
The example: ML workloads on diverse GPUs
Suppose that you have a fairly standard ML workload, such as CLIP image embedding. You can easily start a service such as Triton to serve embedding requests over GRPC. However, if you need to compute millions of embeddings per day, cost becomes an issue, and you’d want to use the most cost-effective hardware, and if possible, spot/interruptible instances.
However, ML workloads are very sensitive to hardware. For CPU inference, you can have a 2x-3x difference between same-generation instances. For GPU inference, the most cost-effective AWS instances are g4 and g5 instances, which have Nvidia T4 and A10G GPUs respectively, with a 30% performance difference, and if you have a spare H100 somewhere; the difference is huge.
If you simultaneously use hardware instances with such performance differences, the GRPC round-robin balancing would work rather badly. The slower instance will receive the same number of requests as the faster instance, and eventually will run out of internal request queue or out of memory. We need a way to distribute requests proportionally to instance performance, and some way to implement backpressure if the instances no longer handle the load.
GRPC load balancing overview
In modern Kubernetes apps, GRPC servers are usually implemented this way:
- There’s a Kubernetes deployment for the actual server
- There’s a Kubernetes service that points to the deployment
- The service is defined as headless
- The GRPC client uses a URL in the form
dns:///service.namespace
- Because the service is headless, DNS lookup returns a list of IP addresses of all individual server instances
- The GRPC client, for each request, decides which IP address should serve it and opens the connection if necessary and sends the request.
The default load balancing algorithm is called pick_first
, so it does not actually
balance anything. The GRPC spec assumes that the service knows what works better, and it
can declare that using service config - which is a JSON document. For example, it
can look like this
{
"loadBalancingConfig": [ { "round_robin": {} } ],
}
When the GRPC client resolves the DNS name, such as service.name
, it tries to lookup
another DNS name, in the form, _grpc_config.service.name
, and expects it to be a
text attribute with the JSON service config. When it works, the client does not have
to do anything. Sadly, this does not work in Kubernetes - it’s not possible to
add such text attributes to a K8S service object.
Instead, we can provide default service config in the client. For example, that’s how to do that in a Go code:
conn, err := grpc.NewClient(*target,
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [ { "round_robin": {} } ]}`),
)
This is obviously not perfect, as every client must use the correct config and update it when necessary, but there’s no better standard solution right now.
Weighted load balancing
The round robin balancing picks a random instance for each request. To balance according to target load, we need another balancer, called weighted round robin.
The first key change is that each target is assigned a weight, and the probability of choosing a target is proportional to the weight. If one target has a weight of 10 , and another has a weight of 1, the first target will receive, on average, 10 times more requests.
The second key change is that the target weights can be automatically computed. There’s a protocol called ORCA (open request cost aggregation) that the target can use to communicate load. It can be reported per each request, or it can be reported periodically. In our ML example, with various request batching, we’re rather interested in average performance, so we’ll use periodic reports.
The actual code is suprisingly simple.
On the server -side, we first set up ORCA with our GRPC server
smr := orca.NewServerMetricsRecorder()
opts := orca.ServiceOptions{
ServerMetricsProvider: smr,
}
and then we start a reporting loop
go func() {
lastRequests := 0.0
lastTime := time.Time{}
for {
load, requests, err := // Obtain the CPU load and total request count
t := time.Now()
if !lastTime.IsZero() {
deltaRequests := requests - lastRequests
deltaTime := float64(t.UnixMilli()-lastTime.UnixMilli()) / 1000
smr.SetQPS(deltaRequests / deltaTime)
smr.SetApplicationUtilization(load)
}
lastRequests = requests
lastTime = t
time.Sleep(30 * time.Second)
}
}()
On the client side, we only need to provide a simple service config
conn, err := grpc.NewClient(*target,
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"weighted_round_robin":{"enableOobLoadReport": true}}]}`),
)
For further details, you can refer to the source.
In our motivating example, this works like magic. Faster instances receive more requests, while slower instances are not overwhelmed.
Least requests balancing
The weighted round robin balancing is very powerful but is not always applicable. It requires that the target expose the metrics, and weight updates can take a while. Suppose we have say 10 instances, each of which receives over 1000 requests per second, and we want a 10ms P50 response time. Even a transient slowdown of one instance will cause hundreds of requests to be late, while new requests continue to arrive. In one practical example, even connecting to one instance and using JMX to collect a CPU profile was enough to create notable overall degradation.
In this case, we can use least request balancing, where a client sends the next request to the target with the smallest number of uncompleted requests. Because it’s trivial to update, it responds very quickly to any issues.
The example code to use least requests balancing is entirely client-side:
conn, err := grpc.NewClient(*target,
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"least_request_experimental":{"choiceCount": 4.0}}]}`),
)
If you’re interested, you can review the Go implementation.
Implementing backpressure
Even if the request rate is perfectly aligned with target capacity, the clients can send more requests that we can handle. We need some sort of backpressure.
Before building that, let’s recall how GRPC works on the network level. For each server instance, the client opens a single HTTP 2 connection, which GRPC calls channel. Inside the connection, HTTP 2 allows multiple independent streams, identified by an integer. GRPC calls them subchannel. For a normal synchronous call, a new subchannel is created, a request is sent, and then the server eventually sends back the response.
It seems, then, that backpressure is naturally implemented by processing requests at a natural rate, and letting additional subchannels sit idle. E.g. a Go server implementation might look like this:
var sem = semaphore.NewWeighted(10)
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received request for: %v", in.GetName())
// Allow up to 10 concurrent request. The others will be left hanging.
if err := sem.Acquire(ctx, 1); err != nil {
return nil, status.Error(codes.ResourceExhausted, "failed to obtain semaphore")
}
defer sem.Release(1)
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
However, there are a few settings to make this work well in practice.
First, there’s a setting that controls the maximum number of connections the server will accept, and it is 100 by default. If more client requests arrive, they will result in error. We can adjust this limit explicitly:
server := grpc.NewServer(
grpc.MaxConcurrentStreams(1000),
)
Second, such backpressure implementation will often result in client timeouts, so you need to raise them as appropriate.
Third, a combination of HTTP flow control and memory management can result in unexpectely high memory consumption. This is the most unexpected effect, so requires a bit of details.
GRPC flow control and memory management
In all network protocols, there’s flow control - where sending of data by one side depends on confirmation of the receiving side. The maximum number of bytes that one side can send without confirmation is called window size. In GRPC the window size can be between 64K and 2M, and is auto-negotiated depending on network speed.
Consider a 10GBit network, which translates to roughly 1000 MBytes per second. If a GRPC request is less than 2MB, and we already have a GRPC connection, then a new GRPC call can be done by allocating new subchannel id, and sending the entirely request right away. It will take around 2ms, which is faster in terms of latency than exchanging packets to make sure the server is ready. In practice, on fast network, 2MB will be the auto-negotioated window size.
Those requests must be stored somewhere on the server until they are processed. Most GRPC imlementations don’t let that to chance, and take over memory management. For example, the Go implementation uses pools of 5 possible sizes - 256 bytes, 4KB, 16KB, 32KB, and 1MB.
In our case, it means that with 1000 concurrent requests of 40KB, all of them will be immediately sent, and each one will allocate a 1MB pool, for a total of extra 1GB of buffer memory. With higher number of concurrent connection or larger request size (e.g. full-size images), the memory consumption can become a problem.
We can just accept it, customize the pool allocator (relatively complex), or force smaller flow control window size.
The latter can be done rather easily:
server := grpc.NewServer(
// Note that while it says 'Initial', the implementation will actually disable dynamic window
// See https://github.com/grpc/grpc-go/blob/f49c747db7540bf5edba06ce99f013e187966fc9/internal/transport/http2_server.go#L182
grpc.InitialWindowSize(4*64*1024),
// This is per-connection window, set it to the same value.
grpc.InitialConnWindowSize(4*64*1024),
)
The final solution
Returning to our motivating example, here’s one possible architecture we can use for cost-effective GPU inference.
- The actual inference is done by the Triton inference server, running on AWS spot instances with different GPUs or CPUs. The queue length is configured to maximize performance.
- In front of each Triton instance, we can run a custom GRPC proxy. It will obtain load metrics from Triton, compute observed RPS, and export that over GRPC ORCA protocol, to allow weighted load balancing. The proxy will also maintain its own queue, at least large enough for 30 seconds of processing, to allow load and RPS metrics to be stable within the 30-second reporting interval. It can also apply backpressure.
- Finally, the clients will use weighted load-balancing to send requests proportionally to target performance. The clients might be Spark jobs, or some scheduled tasks framework, suitable for queuning millions of inference requests.
This architecture proved rather effective in my experience, and maybe it an be useful for you as well.