Kafka Consumer Assignments

I recently had the opportunity to work on a project upgrading one of Signal’s Redis-based message broker pipelines to Apache Kafka. At Signal, we use Kafka extensively to scale our real-time data platform. Kafka buffers writes to our Cassandra database, enqueues batched data, and provides real-time event streams. We continue to find new ways to use Kafka to solve our infrastructure needs.

The Kafka consumer, however, can be finicky to tune. There are a lot of performance knobs and it is important to have an understanding of the semantics of the consumer and how Kafka is designed to scale.

 

Topic-partitions: the unit of parallelism

The unit of parallelism in Kafka is the topic-partition. Each topic represents a logical queue of similar data. To support scaling the throughput of a topic, Kafka supports partitioning of topics.

Partitions are essentially append-only log files on disk. A topic is therefore stored in Kafka as a set of log files that belong to the topic. Each consumer group stores an offset per topic-partition which represents where that consumer group has left off processing in a particular topic-partition. More partitions means better throughput, but at the cost of more files on disk and more offsets to manage.

(For brevity, I will henceforth refer to topic-partitions as merely “partitions.”)

 

Kafka consumer semantics

For the sake of this article I will use the term “consumer” to refer to a single consumer thread. (This is the terminology that Kafka’s documentation uses.) When you create a KafkaConsumer in Java, you are creating a consumer in this sense. Each consumer is a member of a logical consumer group. Obversely, each consumer group is composed of a number of consumers.

Each consumer in the group is assigned a set of partitions. An assignment essentially functions as an exclusive lock on a given set of partitions. Each partition is assigned to exactly one consumer per group, and only the consumer that owns that partition will be able to read its data while the assignment persists. Each consumer can be assigned many partitions, however.

You can think of the consumer assignments as a map-like data structure with consumers as the key and a set of partitions as the value. As an example, let’s say we have two topics (t0 and t1) each with two partitions each (p0 and p1). When we connect a consumer group consisting of two consumers, C1 and C2, Kafka will assign the partitions out to the consumer in a similar manner to the example below:

C1 = t0p0, t1p1

C2 = t1p0, t0p1

That is to say, C1 is assigned the partitions t0p0 and t1p1, and C2 is assigned the partitions t1p0 and t0p1. When C1 polls for data, it receives records only from t0p0 and t1p1, and likewise C2 only receives records from its partitions.

 

What is a rebalance?

While running a live Kafka system we often see consumer rebalances occurring during normal operation. A consumer rebalance is not an erroneous condition, but rather represents a change in the mapping outlined above. There are two events that can cause a consumer rebalance:

  1. A topic-partition is added or removed.
  2. A consumer is added or removed from the group.

We should expect rebalances to occur regularly due to deploys and maintenance, as consumers will leave or re-join the group when applications are restarted, so it is wise to design a consumer system to gracefully handle a rebalance event.

Topic-partitions added or removed

Scenario 1 may occur if your broker has the topic auto-creation setting enabled or if you manually increase the number of partitions for an existing topic. In this case, the broker creates the new partitions on disk and broadcasts them out during the next metadata refresh so that consumers can become aware of their existence.

When the metadata indicates that new partitions are available, the consumer group must decide which consumers in the group will receive the new partitions. The partition assignor is then responsible for making this decision and returning the new assignment mapping to the broker. (Note that the partition assignor actually runs on a randomly elected consumer application, not on the broker.)

For example, if a new topic, t3, were suddenly created on our system with one partition, we may see a new assignment that looks like the following:

C1 = t0p0, t1p1, t3p0

C2 = t1p0, t0p1

The group has now rebalanced to accommodate the addition of the new topic.

Consumers added or removed

Scenario 2 indicates that the number of consumers has changed and therefore our mapping is no longer valid. Let’s say, for the sake of demonstration, that we have the following assignment:

C1 = t0p0, t1p1

C2 = t1p0, t0p1

What happens if the C1 thread suddenly dies? Now we have two partitions (t0p0 and t1p1) which have to be re-assigned to another consumer in the group. In this case, C2 is the only remaining consumer, so it receives all of the partitions, and our new consumer group assignment looks like this:

C2 = t0p0, t0p1, t1p0, t1p1

The consumer group has been rebalanced to accommodate the loss of C1. If C1 were to later come back online, it could rejoin the group and the rebalance process would take place again.

 

Tuning the assignment algorithm

In this particular use case, we need to handle a large number of topics – 80 or so. Each of these topics represents a queue of data that is piped to a different external downstream service with varying latencies. Each topic also varies quite a bit in terms of volume. Our consumer group subscribes to all of them with a regular expression to support dynamic creation of new topics, allowing the system to scale without a configuration change. So it was in our best interest to ensure that the load of partitions is reasonably well distributed across the application nodes in our system.

We thought it was good enough to use Kafka’s default “range” partition assignment strategy to achieve this design. After deploying the Kafka consumer, however, we noticed that some application nodes received the lion’s share of partition assignments. Upon inspection, we realized that we had missed a critical note in the documentation:

Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition.

D’oh! As it turns out, Kafka’s default “range” assignment algorithm assumes you want each topic  assigned individually in a round-robin fashion, rather than round-robin-ing all topics’ partitions across your full set of consumers. With this realization, we moved forward with the “round robin” assignor option, which explicitly lumps all partitions together during the assignment.

Unfortunately, this didn’t do what we expected either. We still saw a clustering of partitions around the first few instances in our set of application nodes. Flummoxing, indeed.

Inspecting the partition assignments directly revealed the source of the problem, which was foreshadowed in the documentation quoted above:

[…] we lay out the available partitions in numeric order and the consumer threads in lexicographic order […]

The problem lay in the way we named our consumer threads. We typically set the name of a consumer to a string consisting of the hostname appended by a randomly generated UUID. For example, a consumer on host “application01” might be named “application01-3321631a-2476-11e7-93ae-92361f002671.” Kafka’s default assignors use the consumer name to sort the consumers prior to performing the assignments. So the net effect was that alphabetically “lower” consumers received the remainder of partitions if the total number of partitions did not divide evenly into the number of consumers.

Worse still, the partitions themselves are sorted alphabetically by Kafka during assignment. This meant in our system that the partitions for a topic named “Aardvark” would always be assigned to the consumers on the first application node (e.g., “application01”). All partitions for a particular topic would tend to cluster around one or two application nodes – the opposite of what we want!

In this use case, we had to write our own partition assignment algorithm which randomly orders the consumers before making assignments. This provides approximately the outcome that we want for this system – the partitions for a given topic are assigned across all available application nodes, with no affinity for any particular node.

 

Lessons learned

Read the docs! Everything we needed to know was readily available in Kafka’s documentation – though we could perhaps be forgiven for failing to notice the relevant passage on first glance.

Additionally, it’s important to understand Kafka’s data model when designing a solution. Often Kafka is thought of as a distributed queue system. While topics can be semantically treated as a queue, it is important to understand that the underlying data structure is that of a set of append-only logs that are locked to a single consuming thread. It is important to ensure that the assignment of partitions is balanced according to the requirements of your system.

Originally published May 09, 2017

Robert Quinlivan

Robert Quinlivan was formerly a software engineer at Signal where he works on backend services. He is interested in functional languages and backend architecture.

Subscribe for Updates
X