NUTS AND BOLTS Apache Kafka Tuning Lead image: Photo by Patrick Schneider on Unsplash
Photo by Patrick Schneider on Unsplash
 

Kafka: Scaling producers and consumers

Smooth

A guide to 10x scaling in Kafka with real-world metrics for high throughput, low latency, and cross-geographic data movement. By Jesse Yates

According to the Kafka home page [1], "Apache Kafka is an open-source distributed event streaming platform … for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications." A basic Kafka ecosystem has three components – producers, brokers, and consumers. Although much has been written about tuning brokers, reliably configuring producers and consumers is something of a dark art. All systems are dependent on your local setup, but some standard metrics that you can look at and knobs that can be tuned can increase performance 10 times or more. In this article, I walk through a real-life(-ish) example of how to diagnose and then fix a bottlenecked stream-processing system.

Assume you are mirroring data from an edge Kafka cluster into a central Kafka cluster that will feed your analytics data warehouse (Figure 1). You've set up the edge with 100+ partitions for many of the topics you are consuming (because you had the forethought to expect scale and knew partitions are generally pretty cheap – go you!). That means you could easily be mirroring 1000+ partitions into your central Kafka for each edge cluster.

The data generated in the edge clusters travels by mirrors, on which consumers and producers convert the data and feed it into a central Kafka cluster.
Figure 1: The data generated in the edge clusters travels by mirrors, on which consumers and producers convert the data and feed it into a central Kafka cluster.

In just this small data flow, you would need to consider a number of possible issues. Not only do you need to ensure your Kafka clusters are configured to scale (i.e., the number of partitions, the right tuning parameters) but also that the mirrors are scaled and tuned correctly. Many articles and posts have been written on tuning Kafka clusters, so I will focus on the client side, getting the producer and consumers (conveniently encapsulated in the data mirroring processing) tuned for high throughput, low latency, and cross-geographic data movement.

Edge Kafka clusters have some notable advantages – in particular, geographic fault isolation and reduced latencies to end users: User data can be sent faster to a secure replicated store (i.e., to the central Kafka cluster), which reduces the risk of critical data being lost. At the same time, you get a fallback for any local catastrophes (e.g., an earthquake in California), so your system remains available. However, you still need to get that data back to a central location for global analysis, and that means each of your remote mirrors has an extra 100ms of latency, roughly, for every mirror request you make. Chances are, this isn't going to work out of the box. Too bad, so sad. Time to get engineering!

As you scroll through the logs of your misbehaving consumer, you might see something like the output in Listing 1. If you turn on DEBUG logging, you might also see log lines, as in Listing 2. Your consumer is trying to tell you: I didn't get a response in the time I expected, so I'm giving up and trying again soon.

Listing 1: Misbehaving Consumer

2019-06-28 20:24:43 INFO  [KafkaMirror-7] o.a.k.c.FetchSessionHandler:438 - [Consumer clientId=consumer-1, groupId=jesseyates.kafka.mirror] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 3: org.apache.kafka.common.errors.DisconnectException

Listing 2: DEBUG Logging

2019-06-27 20:43:06 DEBUG [KafkaMirror-11] o.a.k.c.c.i.Fetcher:244 - [Consumer clientId=consumer-1, groupId=jesseyates.kafka.mirror] Fetch READ_UNCOMMITTED at offset 26974 for partition source_topic-7 returned fetch data (error=NONE, highWaterMark=26974, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=0)

Here are some quick configurations to check:

Unless otherwise noted, all the metrics above are assumed to be client (consumer)-side metrics MBeans and have the prefix

kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.w]+)

or have topic=([-.w]+)` for topic-scoped metrics.

These configurations can interact in interesting ways. For instance, if you tell the server to wait to fill the request (fetch.max.wait.ms) and have the timeout set short, you will have more retries but potentially better throughput and will have likely saved bandwidth for those high-volume topics/partitions. If you are running clusters in cloud providers, this can save you bandwidth exit costs and, thus, non-trivial amounts of money over time.

Don't forget that whatever you were using when connecting to a geographically "more local" source cluster (i.e., not across the country) will probably stop working if you use the same settings to connect to a more distant cluster, because now you have an extra 50-100ms of roundtrip latency to bear. The default settings with 50ms timeouts for responses mean you will start to disconnect early all the time.

Sadly, I cannot offer any exact advice that will always work for these settings. Instead, along with the associated metrics, these configurations are a good place to start fiddling and can give you a good understanding when reading the standard Kafka documentation.

Now say that you have tuned your timeouts up, made sure you are fetching at least 1 byte, and you are still getting these errors in the logs. It can be a challenge to pinpoint the exact problem; you might have 100+ consumer instances, and because they work as a team, just one bad apple could tip you into perpetual rebalance storms.

To simplify the problem, turn down the number of instances and select only some of the topics you need to mirror. Eventually, you probably will get to a set of topics that suddenly starts to work. Hooray! Things are working magically. Maybe it was those tweaks you made to get the topics working? Time to scale it back up … and it's broken again. Crapola. (This is exactly what happened to me recently.)

Did you remember to check your garbage collection (GC)? I bet you are going to find that your consumers are doing stop-the-world (STW) GC pauses for near or over your timeouts. Your one (or two or three) little mirrors are GCing themselves to death; every time they disconnect, they generate a bunch more objects, which then add GC pressure. Even if your mirror starts working, it can quickly churn garbage and spiral into a GC hole from which it never recovers. This case is even more frustrating because it can look like the mirror is running fine for 10 or 20 minutes, and then suddenly – BOOM! – it stops working.

I've found that using the Java GC command-line options

-server
-XX:+UseParallelGC
-XX:ParallelGCThreads=4

are more than sufficient to keep up. It doesn't use the fancy garbage first GC (G1GC), but for a simple mirror application, you don't need complex garbage collection – most objects are highly transient and the rest are small and very long lived. This setup is quite a nice fit for the "old" Java GC.

Unbalanced Consumers

Consumers can become "unbalanced" and have some instances in the group with many partitions and others with none. But isn't Kafka supposed to distribute partitions across consumers? A quick reading of the documentation would have you think that it should just evenly assign partitions across all the consumers, and it does – as long as you have the same number of partitions for all topics. As recently as Kafka 2.1+ (the latest stable release I've tested), as soon as you stop having the same number of partitions, the topic with the lowest number of partitions is used to determine the buckets, and then those buckets are distributed across nodes.

For example, say you have two topics, one with 10 and another with 100 partitions, and 10 consumer instances. You start getting a lot of data coming into the 100-partition topic, so you turn up the number of consumers to 100, expecting to get 90 consumers with one partition and 10 consumers with two partitions (about as even as you can distribute 110 partitions). That is, one partition on 10 instances for each of topic 1 and then an even distribution of topic 2.

Unfortunately, this distribution is not what you see. Instead, you will end up with 10 consumers, each with 11 partitions and 90 consumers sitting idle. That's the same distribution you had before, but now with extra overhead to manage the idle consumers.

The client configuration you need is:

partitioner.class = org.apache.kafka.clients.producer.RoundRobinPartitioner

Now the consumer group will assign the partitions by round robin across the entire consumer group, which will get you back to the distribution you expected, allowing you to balance load nicely and increase your overall throughput.

Tuning Producers

Now that you have the consumer side of your mirror pushing data quickly, you need to tune up the producing side, as well, to push data to the central Kafka cluster as quickly as possible. Already you are in the 95th percentile of users, because generally the default client configurations are more than enough to work for most producers.

To learn more about the internals of the producers, I recommend you look at a talk by Jiangie Qin [2]. Not only does his presentation walk you through how the producer works, it can give you some first-pass tuning recommendations. Still, I personally prefer a more empirical approach that is based on what clients say in terms of their metrics, which can then be used to optimize your particular use case.

Back to Basics

First, you need to understand why your producer is going slow, so the first question you need to ask is: Is it Kafka or is it me? Maybe it's Kafka. Some metrics to check to ensure that the cluster is happy are:

Unfortunately, in this convenient tuning investigation story, Kafka seems to be idling happily along, so you are left with tuning your client.

The obvious first starting place is ensuring that you have compression.type set on your producer. Compression on the producer side is seriously worth considering, especially if you have even a little bit of extra CPU available. Producer-side compression will help Kafka store more data quickly because the broker just writes the data to disk directly out of the socket (and vice versa for the consumer path), making it much more efficient for the whole pipeline (writing, storing, and reading) if the producer can just handle the compression up front.

If you are running Kafka 2.X+, you should have access to zstd compression. Some tests I've seen show a marked improvement over the alternatives. It got close to gzip compression, but with the CPU overhead of lz4. Other tests I've run on my data show it can be significantly worse than other, already quite compressed formats. Your mileage may vary; be sure to test on your data.

The next thing you should check is the state of your batches. The easiest configuration to tweak is linger.ms. You can think of this as batching by time. By increasing latency, you can then increase your throughput by eliminating the overhead of extra network calls.

Therefore, you should check out the record-queue-time-avg metrics, or the average time a batch waits in the send buffer (aka, how long it takes to fill a batch). If you are consistently below your linger.ms, you are filling your batch sizes. The first tweak is to increase latency so that you can (no surprise) increase the throughput, too, by increasing linger.ms (Note that Kafka defaults to not waiting for batches, leaning toward producing lower latency at the risk of more remote procedure calls). I find 5ms is a nice sweet spot.

The configurations used so far are then

compression.type=zstd
linger.ms = 5

Unfortunately in this convenient example, even after setting compression and tuning linger.ms, you are still not getting the throughput you need.

Going Deeper

Once you get further into producer tuning, the configurations start to get more interrelated, with some important non-linear and sometimes unexpected effects on performance. It pays to be extra patient – and scientific – about combinations of different parameters. Remember, you should be continually going back to understanding the root bottleneck while keeping an eye on optimizing the rate of records flowing through the producer.

The next questions to ask are: How big are your records (as Kafka sees them, not as you think they are)? Are you making "good" batches?

The size of the batch is determined by the batch.size configuration, which is the number of bytes after which the producer will send the request to the brokers, regardless of the linger.ms value. Requests sent to brokers will contain multiple batches – one for each partition.

A few other things you need to check include the number of records per batch and their size. Here is where you can start really digging into the kafka.producer MBean. The batch-size-[avg|max] can give you a good idea of the distribution of the number of bytes per batch, and record-size-[avg|max] can give you a sense of the size of each record. Divide the two and you have a rough rate of records per batch. Match this to the batch.size configuration and determine approximately how many records should be flowing through your producer. You should also sanity check this against the record-send-rate – the number of records per second – reported by your producer.

You might be a bit surprised if you occasionally have very large messages, for which you should check record-size-max, because the max.request.size configuration will limit the maximum size (in bytes) of a request and therefore inherently limit the number of record batches, as well.

What about the time you are waiting for I/O? Check out the io-wait-ratio metrics to see where you are spending time. Is the I/O thread waiting or are your producers processing?

Next, you need to make sure that the client buffer is not getting filled. Each producer has a fairly large buffer to collect data that then gets batched and sent to the brokers. In practice, I have never seen this to be a problem, but it often pays to be methodical. Here, the metric buffer-available-bytes is your friend, allowing you to ensure that your buffer.memory size is not being exhausted by your record sizes, batching configurations from earlier, or both.

Producing too many different topics can affect the quality of compression, because you can't compress well across topics. In that case, you might need some application changes so that you can batch more aggressively per destination topic, rather than relying on Kafka to just do the right thing. An advanced tactic would be to check the bytes-per-topic metrics from the producer, so you should only consider doing so after benchmarking and making sure other adjustments are not helping.

Wrap-Up

The configurations and metrics to tweak on the producer to get high throughput are summarized in Table 1. At this point, you should have all the tools you need to scale up your client instances. You know the most important optimization switches, some guidelines for adjusting garbage collection, and the nice round robin trick for balancing consumer groups when the consumers encounter differently partitioned topics. For slow producers, apply standard optimizations for compression and idle time or dive into the depths of the producer configuration to find out what really happens to entries and stacks.

Tabelle 1: Producer Tuning Summary

Config/Metric

Comment

compression.type

Test on your data.

linger.ms

Check the average time a batch waits in the send buffer (how long it takes to fill a batch) with record-queue-time-avg.

batch.size

Determine records per batch, bytes per batch (batch-size-avg, batch-size-max), and records per topic per second (record-send-rate) and check your bytes per topic.

max.request.size

Limit the number and size of batches (record-size-max).

Time spent waiting for I/O

Are you really waiting (io-wait-ratio)?

buffer.memory + queued requests

32MB default (roughly total memory by producer) allocated to buffer records for sending (see buffer-available-bytes).

The tips in this article should give you a bit more guidance beyond the raw documentation in the Kafka manual for how to go about removing bottlenecks and getting the performance out of all parts of your streaming data pipelines that you know you should be getting.