Scaling Kafka

2016-10-17 kafka

In my previous article about Kafka, I introduced some basic concepts, and showed how to use this message broker using the Java client API.

In this article I will tackle an operational need: adding and removing nodes in a Kafka 0.10.0 cluster.

Creating a topic

We will start with a cluster made of 3 nodes identified 0, 1 and 2. We first create a topic using the kafka-topics.sh tool:

$ bin/kafka-topics.sh --zookeeper zkhost:2181 --create --topic test_topic --partitions 5 --replication-factor 2
Created topic "test_topic".

$ bin/kafka-topics.sh --zookeeper zkhost:2181 --describe --topic test_topic
Topic:test_topic        PartitionCount:5        ReplicationFactor:2     Configs:
        Topic: test_topic       Partition: 0    Leader: 2       Replicas: 2,0   Isr: 2,0
        Topic: test_topic       Partition: 1    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: test_topic       Partition: 2    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: test_topic       Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: test_topic       Partition: 4    Leader: 0       Replicas: 0,2   Isr: 0,2

Our first topic has 5 partitions and a replication factor of 2 which means 10 partitions will be distributed on our 3 nodes cluster. As you can see leader partitions and replicas are distributed homogeneously on the nodes.

Creating a topic

All the topic configuration is stored in Zookeeper, which makes it available to all Kafka. This why in all the commands we will use to manage topics and their partitions there is a --zookeeper argument. You can use the zookeeper-shell.sh tool to dig in Zookeeper tree:

$ bin/zookeeper-shell.sh zkhost:2181
Connecting to zkhost:2181
Welcome to ZooKeeper!

ls /brokers/topics
[__consumer_offsets,test_topic]

get /brokers/topics/test_topic
{"version":1,"partitions":{"4":[0,2],"1":[0,1],"0":[2,0],"2":[1,2],"3":[2,1]}}

ls /brokers/topics/test_topic/partitions
[0, 1, 2, 3, 4]

get /brokers/topics/test_topic/partitions/0/state
{"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2,0]}

Adding a broker node

Now we will add a fourth node with Id 3 to our cluster. Once the node is started and has successfully joined the cluster, it doesn’t automatically receive partitions:

$ bin/kafka-topics.sh --zookeeper zkhost:2181 --describe --topic test_topic
Topic:test_topic        PartitionCount:5        ReplicationFactor:2     Configs:
        Topic: test_topic       Partition: 0    Leader: 2       Replicas: 2,0   Isr: 0,2
        Topic: test_topic       Partition: 1    Leader: 0       Replicas: 0,1   Isr: 1,0
        Topic: test_topic       Partition: 2    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: test_topic       Partition: 3    Leader: 2       Replicas: 2,1   Isr: 1,2
        Topic: test_topic       Partition: 4    Leader: 0       Replicas: 0,2   Isr: 0,2

So we will have to redistribute partitions on the 4 nodes. In fact, our cluster has 2 topics:

$ bin/kafka-topics.sh --zookeeper zkhost:2181 --list
__consumer_offsets
test_topic
  • test_topic is the topic we created above

  • __consumer_offsets is an internal topic used to track consumer offsets. It has 5 partitions and a replication factor of 3:

$ bin/kafka-topics.sh --zookeeper zkhost:2181 --describe --topic __consumer_offsets
Topic:__consumer_offsets        PartitionCount:5        ReplicationFactor:3     Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
        Topic: __consumer_offsets       Partition: 0    Leader: 0       Replicas: 0,3,1 Isr: 0,3,1
        Topic: __consumer_offsets       Partition: 1    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
        Topic: __consumer_offsets       Partition: 2    Leader: 2       Replicas: 2,1,3 Isr: 2,1,3
        Topic: __consumer_offsets       Partition: 3    Leader: 3       Replicas: 3,2,0 Isr: 3,2,0
        Topic: __consumer_offsets       Partition: 4    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2

We need to write a JSON file to list the topics we want to reorganize: test_topic and __consumer_offsets in our case:

topics.json
{ "version": 1,
  "topics": [
     {"topic": "test_topic"},
     {"topic": "__consumer_offsets"}
  ]
}

Now we can use the kafka-reassign-partitions.sh tool to generate partition assignments. It takes the topic list and the broker list as input, and produces the assignment plan in JSON format:

$ bin/kafka-reassign-partitions.sh --zookeeper zkhost:2181 --generate --topics-to-move-json-file topics.json --broker-list 0,1,2,3
Current partition replica assignment

{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[2,0]},{"topic":"test_topic","partition":4,"replicas":[0,2]},{"topic":"__consumer_offsets","partition":0,"replicas":[0,3,1]},{"topic":"__consumer_offsets","partition":3,"replicas":[3,2,0]},{"topic":"test_topic","partition":3,"replicas":[2,1]},{"topic":"test_topic","partition":2,"replicas":[1,2]},{"topic":"__consumer_offsets","partition":4,"replicas":[0,1,2]},{"topic":"test_topic","partition":1,"replicas":[0,1]},{"topic":"__consumer_offsets","partition":2,"replicas":[2,1,3]},{"topic":"__consumer_offsets","partition":1,"replicas":[1,0,2]}]}
Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[3,0]},{"topic":"test_topic","partition":4,"replicas":[3,1]},{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]},{"topic":"test_topic","partition":2,"replicas":[1,2]},{"topic":"test_topic","partition":3,"replicas":[2,3]},{"topic":"__consumer_offsets","partition":3,"replicas":[3,0,1]},{"topic":"__consumer_offsets","partition":4,"replicas":[0,2,3]},{"topic":"test_topic","partition":1,"replicas":[0,1]},{"topic":"__consumer_offsets","partition":1,"replicas":[1,2,3]},{"topic":"__consumer_offsets","partition":2,"replicas":[2,3,0]}]}

Let’s use the above proposed reassignment plan, format it a bit to make it more readable, and save it in a reassignment.json file:

reassignment.json
{ "version":1,
  "partitions":[
    {"topic":"test_topic",        "partition":0,"replicas":[3,0]},
    {"topic":"test_topic",        "partition":1,"replicas":[0,1]},
    {"topic":"test_topic",        "partition":2,"replicas":[1,2]},
    {"topic":"test_topic",        "partition":3,"replicas":[2,3]},
    {"topic":"test_topic",        "partition":4,"replicas":[3,1]},
    {"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]},
    {"topic":"__consumer_offsets","partition":1,"replicas":[1,2,3]},
    {"topic":"__consumer_offsets","partition":2,"replicas":[2,3,0]},
    {"topic":"__consumer_offsets","partition":3,"replicas":[3,0,1]},
    {"topic":"__consumer_offsets","partition":4,"replicas":[0,2,3]}
  ]
}

The aim of this file is to tell on which node each partition (leader or replica) must be located. You can check in this assignment plan that:

  • All 4 nodes 0, 1, 2 and 3 are used,

  • Each node has roughly the number of partitions: 6 or 7 (= (2×5 + 3×5) ÷ 4)

To run this plan, we will use the kafka-reassign-partitions.sh tool with the --execute command. It takes the generated reassignment.json file as input.

$ bin/kafka-reassign-partitions.sh --zookeeper zkhost:2181 --execute --reassignment-json-file reassignment.json
Current partition replica assignment

{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[2,0]},{"topic":"test_topic","partition":4,"replicas":[0,2]},{"topic":"__consumer_offsets","partition":0,"replicas":[0,3,1]},{"topic":"__consumer_offsets","partition":3,"replicas":[3,2,0]},{"topic":"test_topic","partition":3,"replicas":[2,1]},{"topic":"test_topic","partition":2,"replicas":[1,2]},{"topic":"__consumer_offsets","partition":4,"replicas":[0,1,2]},{"topic":"test_topic","partition":1,"replicas":[0,1]},{"topic":"__consumer_offsets","partition":2,"replicas":[2,1,3]},{"topic":"__consumer_offsets","partition":1,"replicas":[1,0,2]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"__consumer_offsets","partition":4,"replicas":[0,2,3]},{"topic":"__consumer_offsets","partition":3,"replicas":[3,0,1]},{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]},{"topic":"test_topic","partition":4,"replicas":[3,1]},{"topic":"test_topic","partition":3,"replicas":[2,3]},{"topic":"test_topic","partition":2,"replicas":[1,2]},{"topic":"test_topic","partition":0,"replicas":[3,0]},{"topic":"__consumer_offsets","partition":2,"replicas":[2,3,0]},{"topic":"test_topic","partition":1,"replicas":[0,1]},{"topic":"__consumer_offsets","partition":1,"replicas":[1,2,3]}]}

You should be aware that you can not execute an assignment plan containing a dead or stopped node. The assignment can only be executed if mentioned brokers are alive.

Once the reassignment is finished, your partitions have been redistributed over the cluster:

Adding a node

It may take a lot of time to move partitions from one node to another when the partitions are fat. To check the partition reassignment, you can either use:

  • The kafka-reassign-partitions.sh tool with the --verify command.

  • The kafka-topic.sh tool with the --describe command.

$ bin/kafka-reassign-partitions.sh --zookeeper zkhost:2181 --verify --reassignment-json-file reassignment.json
Status of partition reassignment:
Reassignment of partition [__consumer_offsets,4] completed successfully
Reassignment of partition [__consumer_offsets,3] completed successfully
Reassignment of partition [__consumer_offsets,0] completed successfully
Reassignment of partition [test_topic,4] completed successfully
Reassignment of partition [test_topic,3] completed successfully
Reassignment of partition [test_topic,2] is still in progress
Reassignment of partition [test_topic,0] completed successfully
Reassignment of partition [__consumer_offsets,2] completed successfully
Reassignment of partition [test_topic,1] is still in progress
Reassignment of partition [__consumer_offsets,1] completed successfully

$ bin/kafka-topics.sh --zookeeper zkhost:2181 --describe --topic test_topic
Topic:test_topic        PartitionCount:5        ReplicationFactor:2     Configs:
        Topic: test_topic       Partition: 0    Leader: 3       Replicas: 3,0   Isr: 0,3
        Topic: test_topic       Partition: 1    Leader: 0       Replicas: 0,1   Isr: 1,0
        Topic: test_topic       Partition: 2    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: test_topic       Partition: 3    Leader: 2       Replicas: 2,3   Isr: 2,3
        Topic: test_topic       Partition: 4    Leader: 3       Replicas: 3,1   Isr: 3,1

Unfortunately, the tools available to monitor this reassignment are scarce, and you have no clue about how much it will take to end.

Removing a broker node

The recipe to remove a node is very similar to the previous one:

  1. kafka-topic.sh --list to get the topic list and write a topics.json

  2. kafka-reassign-partitions.sh --generate to generate an assignment plan assignment.json excluding the node to remove

  3. kafka-reassign-partitions.sh --execute to run the assignment plan

  4. kafka-reassign-partitions.sh --verify to check whether the assignment plan is applied

  5. Stop the broker and remove it

As an example, we will remove the broker with Id 1.

$ bin/kafka-reassign-partitions.sh --zookeeper zkhost:2181 --generate --topics-to-move-json-file topics.json --broker-list 0,2,3

The tool proposes the following reassignement:

{ "version":1,
  "partitions":[
    {"topic":"test_topic",        "partition":0,"replicas":[0,2]},
    {"topic":"test_topic",        "partition":1,"replicas":[2,3]},
    {"topic":"test_topic",        "partition":2,"replicas":[3,0]},
    {"topic":"test_topic",        "partition":3,"replicas":[0,3]},
    {"topic":"test_topic",        "partition":4,"replicas":[2,0]},
    {"topic":"__consumer_offsets","partition":0,"replicas":[2,3,0]},
    {"topic":"__consumer_offsets","partition":1,"replicas":[3,0,2]},
    {"topic":"__consumer_offsets","partition":2,"replicas":[0,2,3]},
    {"topic":"__consumer_offsets","partition":3,"replicas":[2,0,3]},
    {"topic":"__consumer_offsets","partition":4,"replicas":[3,2,0]}
  ]
}

Once executed, the topic is reorganized like this:

$ bin/kafka-topics.sh --zookeeper zkhost:2181 --describe --topic test_topic
Topic:test_topic        PartitionCount:5        ReplicationFactor:2     Configs:
        Topic: test_topic       Partition: 0    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: test_topic       Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2,3
        Topic: test_topic       Partition: 2    Leader: 3       Replicas: 3,0   Isr: 0,3
        Topic: test_topic       Partition: 3    Leader: 0       Replicas: 0,3   Isr: 3,0
        Topic: test_topic       Partition: 4    Leader: 2       Replicas: 2,0   Isr: 0,2
Removing a node

As you may observe in this example, the data movement between nodes for the partitions of the test_topic is not optimal. As a result, a hand written assignment may sometimes be preferable over the generated one.

To replace a node by another one, you don’t need to use the above scenarios because you can keep the same partition assignment. All you have to do is:

  1. Stop the old node

  2. Give the new node the same Id as the old one

  3. Start the new node

Rack awareness

Starting with version 0.10.0, Kafka supports rack aware replica placement. It means Kafka will try to place replicas in different racks (or availability zones).

The only change is the broker.rack property in the broker configuration file:

broker.id=0
broker.rack=A

For instance, imagine brokers 0 and 1 are in rack A, while brokers 2 and 3, are in rack B. Now, let’s create a topic with a replication factor two, each partition has a replica in each rack.

$ bin/kafka-topics.sh --zookeeper zkhost:2181 --create --topic test_topic --partitions 5 --replication-factor 2
Created topic "test_topic".

$ bin/kafka-topics.sh --zookeeper zkhost:2181 --describe --topic test_topic
Topic:test_topic        PartitionCount:5        ReplicationFactor:2     Configs:
        Topic: test_topic       Partition: 0    Leader: 1       Replicas: 1,3   Isr: 1,3
        Topic: test_topic       Partition: 1    Leader: 3       Replicas: 3,0   Isr: 3,0
        Topic: test_topic       Partition: 2    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: test_topic       Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: test_topic       Partition: 4    Leader: 1       Replicas: 1,2   Isr: 1,2
Rack awareness

This feature is really interesting to improve failure tolerance, but it makes the assignment harder to build manually.

Simple scaling

As you have seen it, horizontally scaling a Kafka cluster is not that hard, but it is tedious.

Kafka Manager allows, through its web UI, to visually reassign partitions to nodes.

Running on a highly elastic environment, like a Docker cluster scheduler, seems sensitive. Some solutions exist though: