Skip to content
Vladimir on TwitterVladimir on LinkedIn

GRPC advanced load balancing

GRPC has won as an inter-service API protocol, and most of the time, scaling a GRPC service is simply a matter of running several instances behind a load balancer with default settings. In this post, we’ll review using a couple of more advanced balancing strategies - including one that considers service load. We’ll also touch on backpressure strategies.

The example: ML workloads on diverse GPUs

Suppose that you have a fairly standard ML workload, such as CLIP image embedding. You can easily start a service such as Triton to serve embedding requests over GRPC. However, if you need to compute millions of embeddings per day, cost becomes an issue, and you’d want to use the most cost-effective hardware, and if possible, spot/interruptible instances.

However, ML workloads are very sensitive to hardware. For CPU inference, you can have a 2x-3x difference between same-generation instances. For GPU inference, the most cost-effective AWS instances are g4 and g5 instances, which have Nvidia T4 and A10G GPUs respectively, with a 30% performance difference, and if you have a spare H100 somewhere; the difference is huge.

If you simultaneously use hardware instances with such performance differences, the GRPC round-robin balancing would work rather badly. The slower instance will receive the same number of requests as the faster instance, and eventually will run out of internal request queue or out of memory. We need a way to distribute requests proportionally to instance performance, and some way to implement backpressure if the instances no longer handle the load.

GRPC load balancing overview

In modern Kubernetes apps, GRPC servers are usually implemented this way:

The default load balancing algorithm is called pick_first, so it does not actually balance anything. The GRPC spec assumes that the service knows what works better, and it can declare that using service config - which is a JSON document. For example, it can look like this

{
  "loadBalancingConfig": [ { "round_robin": {} } ],
}

When the GRPC client resolves the DNS name, such as service.name, it tries to lookup another DNS name, in the form, _grpc_config.service.name, and expects it to be a text attribute with the JSON service config. When it works, the client does not have to do anything. Sadly, this does not work in Kubernetes - it’s not possible to add such text attributes to a K8S service object.

Instead, we can provide default service config in the client. For example, that’s how to do that in a Go code:

conn, err := grpc.NewClient(*target,
    grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [ { "round_robin": {} } ]}`),
)

This is obviously not perfect, as every client must use the correct config and update it when necessary, but there’s no better standard solution right now.

Weighted load balancing

The round robin balancing picks a random instance for each request. To balance according to target load, we need another balancer, called weighted round robin.

The first key change is that each target is assigned a weight, and the probability of choosing a target is proportional to the weight. If one target has a weight of 10 , and another has a weight of 1, the first target will receive, on average, 10 times more requests.

The second key change is that the target weights can be automatically computed. There’s a protocol called ORCA (open request cost aggregation) that the target can use to communicate load. It can be reported per each request, or it can be reported periodically. In our ML example, with various request batching, we’re rather interested in average performance, so we’ll use periodic reports.

The actual code is suprisingly simple.

On the server -side, we first set up ORCA with our GRPC server

    smr := orca.NewServerMetricsRecorder()
    opts := orca.ServiceOptions{
        ServerMetricsProvider: smr,
    }

and then we start a reporting loop

go func() {
	lastRequests := 0.0
	lastTime := time.Time{}
	for {
		load, requests, err := // Obtain the CPU load and total request count
		t := time.Now()
		if !lastTime.IsZero() {
			deltaRequests := requests - lastRequests
			deltaTime := float64(t.UnixMilli()-lastTime.UnixMilli()) / 1000
			smr.SetQPS(deltaRequests / deltaTime)
			smr.SetApplicationUtilization(load)
		}
		lastRequests = requests
		lastTime = t
		time.Sleep(30 * time.Second)
	}
}()

On the client side, we only need to provide a simple service config

conn, err := grpc.NewClient(*target,
	grpc.WithDisableServiceConfig(),
	grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"weighted_round_robin":{"enableOobLoadReport": true}}]}`),
)

For further details, you can refer to the source.

In our motivating example, this works like magic. Faster instances receive more requests, while slower instances are not overwhelmed.

Least requests balancing

The weighted round robin balancing is very powerful but is not always applicable. It requires that the target expose the metrics, and weight updates can take a while. Suppose we have say 10 instances, each of which receives over 1000 requests per second, and we want a 10ms P50 response time. Even a transient slowdown of one instance will cause hundreds of requests to be late, while new requests continue to arrive. In one practical example, even connecting to one instance and using JMX to collect a CPU profile was enough to create notable overall degradation.

In this case, we can use least request balancing, where a client sends the next request to the target with the smallest number of uncompleted requests. Because it’s trivial to update, it responds very quickly to any issues.

The example code to use least requests balancing is entirely client-side:

conn, err := grpc.NewClient(*target,
    grpc.WithDisableServiceConfig(),
    grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"least_request_experimental":{"choiceCount": 4.0}}]}`),
)

If you’re interested, you can review the Go implementation.

Implementing backpressure

Even if the request rate is perfectly aligned with target capacity, the clients can send more requests that we can handle. We need some sort of backpressure.

Before building that, let’s recall how GRPC works on the network level. For each server instance, the client opens a single HTTP 2 connection, which GRPC calls channel. Inside the connection, HTTP 2 allows multiple independent streams, identified by an integer. GRPC calls them subchannel. For a normal synchronous call, a new subchannel is created, a request is sent, and then the server eventually sends back the response.

It seems, then, that backpressure is naturally implemented by processing requests at a natural rate, and letting additional subchannels sit idle. E.g. a Go server implementation might look like this:

var sem = semaphore.NewWeighted(10)

func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    log.Printf("Received request for: %v", in.GetName())
    // Allow up to 10 concurrent request. The others will be left hanging.
    if err := sem.Acquire(ctx, 1); err != nil {
        return nil, status.Error(codes.ResourceExhausted, "failed to obtain semaphore")
    }
    defer sem.Release(1)
    return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

However, there are a few settings to make this work well in practice.

First, there’s a setting that controls the maximum number of connections the server will accept, and it is 100 by default. If more client requests arrive, they will result in error. We can adjust this limit explicitly:

server := grpc.NewServer(
	grpc.MaxConcurrentStreams(1000),
)

Second, such backpressure implementation will often result in client timeouts, so you need to raise them as appropriate.

Third, a combination of HTTP flow control and memory management can result in unexpectely high memory consumption. This is the most unexpected effect, so requires a bit of details.

GRPC flow control and memory management

In all network protocols, there’s flow control - where sending of data by one side depends on confirmation of the receiving side. The maximum number of bytes that one side can send without confirmation is called window size. In GRPC the window size can be between 64K and 2M, and is auto-negotiated depending on network speed.

Consider a 10GBit network, which translates to roughly 1000 MBytes per second. If a GRPC request is less than 2MB, and we already have a GRPC connection, then a new GRPC call can be done by allocating new subchannel id, and sending the entirely request right away. It will take around 2ms, which is faster in terms of latency than exchanging packets to make sure the server is ready. In practice, on fast network, 2MB will be the auto-negotioated window size.

Those requests must be stored somewhere on the server until they are processed. Most GRPC imlementations don’t let that to chance, and take over memory management. For example, the Go implementation uses pools of 5 possible sizes - 256 bytes, 4KB, 16KB, 32KB, and 1MB.

In our case, it means that with 1000 concurrent requests of 40KB, all of them will be immediately sent, and each one will allocate a 1MB pool, for a total of extra 1GB of buffer memory. With higher number of concurrent connection or larger request size (e.g. full-size images), the memory consumption can become a problem.

We can just accept it, customize the pool allocator (relatively complex), or force smaller flow control window size.

The latter can be done rather easily:

server := grpc.NewServer(
	// Note that while it says 'Initial', the implementation will actually disable dynamic window
	// See https://github.com/grpc/grpc-go/blob/f49c747db7540bf5edba06ce99f013e187966fc9/internal/transport/http2_server.go#L182
	grpc.InitialWindowSize(4*64*1024),
	// This is per-connection window, set it to the same value.
	grpc.InitialConnWindowSize(4*64*1024),
)

The final solution

Returning to our motivating example, here’s one possible architecture we can use for cost-effective GPU inference.

This architecture proved rather effective in my experience, and maybe it an be useful for you as well.