$ bin/kafka-topics.sh --zookeeper zkhost:2181 --create --topic test_topic --partitions 5 --replication-factor 2
Created topic "test_topic".
$ bin/kafka-topics.sh --zookeeper zkhost:2181 --describe --topic test_topic
Topic:test_topic PartitionCount:5 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: test_topic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: test_topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test_topic Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: test_topic Partition: 4 Leader: 0 Replicas: 0,2 Isr: 0,2
Scaling Kafka
2016-10-17 kafka
In my previous article about Kafka, I introduced some basic concepts, and showed how to use this message broker using the Java client API.
In this article I will tackle an operational need: adding and removing nodes in a Kafka 0.10.0 cluster.
Creating a topic
We will start with a cluster made of 3 nodes identified 0, 1 and 2.
We first create a topic using the kafka-topics.sh
tool:
Our first topic has 5 partitions and a replication factor of 2 which means 10 partitions will be distributed on our 3 nodes cluster. As you can see leader partitions and replicas are distributed homogeneously on the nodes.
All the topic configuration is stored in Zookeeper, which makes it available to all Kafka.
This why in all the commands we will use to manage topics and their partitions there is a --zookeeper
argument.
You can use the zookeeper-shell.sh
tool to dig in Zookeeper tree:
$ bin/zookeeper-shell.sh zkhost:2181
Connecting to zkhost:2181
Welcome to ZooKeeper!
ls /brokers/topics
[__consumer_offsets,test_topic]
get /brokers/topics/test_topic
{"version":1,"partitions":{"4":[0,2],"1":[0,1],"0":[2,0],"2":[1,2],"3":[2,1]}}
ls /brokers/topics/test_topic/partitions
[0, 1, 2, 3, 4]
get /brokers/topics/test_topic/partitions/0/state
{"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2,0]}
Adding a broker node
Now we will add a fourth node with Id 3 to our cluster. Once the node is started and has successfully joined the cluster, it doesn’t automatically receive partitions:
$ bin/kafka-topics.sh --zookeeper zkhost:2181 --describe --topic test_topic
Topic:test_topic PartitionCount:5 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 2 Replicas: 2,0 Isr: 0,2
Topic: test_topic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 1,0
Topic: test_topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test_topic Partition: 3 Leader: 2 Replicas: 2,1 Isr: 1,2
Topic: test_topic Partition: 4 Leader: 0 Replicas: 0,2 Isr: 0,2
So we will have to redistribute partitions on the 4 nodes. In fact, our cluster has 2 topics:
$ bin/kafka-topics.sh --zookeeper zkhost:2181 --list
__consumer_offsets
test_topic
-
test_topic
is the topic we created above -
__consumer_offsets
is an internal topic used to track consumer offsets. It has 5 partitions and a replication factor of 3:
$ bin/kafka-topics.sh --zookeeper zkhost:2181 --describe --topic __consumer_offsets
Topic:__consumer_offsets PartitionCount:5 ReplicationFactor:3 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
Topic: __consumer_offsets Partition: 0 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: __consumer_offsets Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: __consumer_offsets Partition: 2 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: __consumer_offsets Partition: 3 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
Topic: __consumer_offsets Partition: 4 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
We need to write a JSON file to list the topics we want to reorganize:
test_topic
and __consumer_offsets
in our case:
{ "version": 1,
"topics": [
{"topic": "test_topic"},
{"topic": "__consumer_offsets"}
]
}
Now we can use the kafka-reassign-partitions.sh
tool to generate partition assignments.
It takes the topic list and the broker list as input, and produces the assignment plan in JSON format:
$ bin/kafka-reassign-partitions.sh --zookeeper zkhost:2181 --generate --topics-to-move-json-file topics.json --broker-list 0,1,2,3
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[2,0]},{"topic":"test_topic","partition":4,"replicas":[0,2]},{"topic":"__consumer_offsets","partition":0,"replicas":[0,3,1]},{"topic":"__consumer_offsets","partition":3,"replicas":[3,2,0]},{"topic":"test_topic","partition":3,"replicas":[2,1]},{"topic":"test_topic","partition":2,"replicas":[1,2]},{"topic":"__consumer_offsets","partition":4,"replicas":[0,1,2]},{"topic":"test_topic","partition":1,"replicas":[0,1]},{"topic":"__consumer_offsets","partition":2,"replicas":[2,1,3]},{"topic":"__consumer_offsets","partition":1,"replicas":[1,0,2]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[3,0]},{"topic":"test_topic","partition":4,"replicas":[3,1]},{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]},{"topic":"test_topic","partition":2,"replicas":[1,2]},{"topic":"test_topic","partition":3,"replicas":[2,3]},{"topic":"__consumer_offsets","partition":3,"replicas":[3,0,1]},{"topic":"__consumer_offsets","partition":4,"replicas":[0,2,3]},{"topic":"test_topic","partition":1,"replicas":[0,1]},{"topic":"__consumer_offsets","partition":1,"replicas":[1,2,3]},{"topic":"__consumer_offsets","partition":2,"replicas":[2,3,0]}]}
Let’s use the above proposed reassignment plan, format it a bit to make it more readable,
and save it in a reassignment.json
file:
{ "version":1,
"partitions":[
{"topic":"test_topic", "partition":0,"replicas":[3,0]},
{"topic":"test_topic", "partition":1,"replicas":[0,1]},
{"topic":"test_topic", "partition":2,"replicas":[1,2]},
{"topic":"test_topic", "partition":3,"replicas":[2,3]},
{"topic":"test_topic", "partition":4,"replicas":[3,1]},
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]},
{"topic":"__consumer_offsets","partition":1,"replicas":[1,2,3]},
{"topic":"__consumer_offsets","partition":2,"replicas":[2,3,0]},
{"topic":"__consumer_offsets","partition":3,"replicas":[3,0,1]},
{"topic":"__consumer_offsets","partition":4,"replicas":[0,2,3]}
]
}
The aim of this file is to tell on which node each partition (leader or replica) must be located. You can check in this assignment plan that:
-
All 4 nodes 0, 1, 2 and 3 are used,
-
Each node has roughly the number of partitions: 6 or 7 (= (2×5 + 3×5) ÷ 4)
To run this plan, we will use the kafka-reassign-partitions.sh
tool with the --execute
command.
It takes the generated reassignment.json
file as input.
$ bin/kafka-reassign-partitions.sh --zookeeper zkhost:2181 --execute --reassignment-json-file reassignment.json Current partition replica assignment {"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[2,0]},{"topic":"test_topic","partition":4,"replicas":[0,2]},{"topic":"__consumer_offsets","partition":0,"replicas":[0,3,1]},{"topic":"__consumer_offsets","partition":3,"replicas":[3,2,0]},{"topic":"test_topic","partition":3,"replicas":[2,1]},{"topic":"test_topic","partition":2,"replicas":[1,2]},{"topic":"__consumer_offsets","partition":4,"replicas":[0,1,2]},{"topic":"test_topic","partition":1,"replicas":[0,1]},{"topic":"__consumer_offsets","partition":2,"replicas":[2,1,3]},{"topic":"__consumer_offsets","partition":1,"replicas":[1,0,2]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"__consumer_offsets","partition":4,"replicas":[0,2,3]},{"topic":"__consumer_offsets","partition":3,"replicas":[3,0,1]},{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]},{"topic":"test_topic","partition":4,"replicas":[3,1]},{"topic":"test_topic","partition":3,"replicas":[2,3]},{"topic":"test_topic","partition":2,"replicas":[1,2]},{"topic":"test_topic","partition":0,"replicas":[3,0]},{"topic":"__consumer_offsets","partition":2,"replicas":[2,3,0]},{"topic":"test_topic","partition":1,"replicas":[0,1]},{"topic":"__consumer_offsets","partition":1,"replicas":[1,2,3]}]}
You should be aware that you can not execute an assignment plan containing a dead or stopped node. The assignment can only be executed if mentioned brokers are alive.
Once the reassignment is finished, your partitions have been redistributed over the cluster:
It may take a lot of time to move partitions from one node to another when the partitions are fat. To check the partition reassignment, you can either use:
-
The
kafka-reassign-partitions.sh
tool with the--verify
command. -
The
kafka-topic.sh
tool with the--describe
command.
$ bin/kafka-reassign-partitions.sh --zookeeper zkhost:2181 --verify --reassignment-json-file reassignment.json Status of partition reassignment: Reassignment of partition [__consumer_offsets,4] completed successfully Reassignment of partition [__consumer_offsets,3] completed successfully Reassignment of partition [__consumer_offsets,0] completed successfully Reassignment of partition [test_topic,4] completed successfully Reassignment of partition [test_topic,3] completed successfully Reassignment of partition [test_topic,2] is still in progress Reassignment of partition [test_topic,0] completed successfully Reassignment of partition [__consumer_offsets,2] completed successfully Reassignment of partition [test_topic,1] is still in progress Reassignment of partition [__consumer_offsets,1] completed successfully $ bin/kafka-topics.sh --zookeeper zkhost:2181 --describe --topic test_topic Topic:test_topic PartitionCount:5 ReplicationFactor:2 Configs: Topic: test_topic Partition: 0 Leader: 3 Replicas: 3,0 Isr: 0,3 Topic: test_topic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 1,0 Topic: test_topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: test_topic Partition: 3 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: test_topic Partition: 4 Leader: 3 Replicas: 3,1 Isr: 3,1
Unfortunately, the tools available to monitor this reassignment are scarce, and you have no clue about how much it will take to end.
Removing a broker node
The recipe to remove a node is very similar to the previous one:
-
kafka-topic.sh --list
to get the topic list and write atopics.json
-
kafka-reassign-partitions.sh --generate
to generate an assignment planassignment.json
excluding the node to remove -
kafka-reassign-partitions.sh --execute
to run the assignment plan -
kafka-reassign-partitions.sh --verify
to check whether the assignment plan is applied -
Stop the broker and remove it
As an example, we will remove the broker with Id 1.
$ bin/kafka-reassign-partitions.sh --zookeeper zkhost:2181 --generate --topics-to-move-json-file topics.json --broker-list 0,2,3
The tool proposes the following reassignement:
{ "version":1,
"partitions":[
{"topic":"test_topic", "partition":0,"replicas":[0,2]},
{"topic":"test_topic", "partition":1,"replicas":[2,3]},
{"topic":"test_topic", "partition":2,"replicas":[3,0]},
{"topic":"test_topic", "partition":3,"replicas":[0,3]},
{"topic":"test_topic", "partition":4,"replicas":[2,0]},
{"topic":"__consumer_offsets","partition":0,"replicas":[2,3,0]},
{"topic":"__consumer_offsets","partition":1,"replicas":[3,0,2]},
{"topic":"__consumer_offsets","partition":2,"replicas":[0,2,3]},
{"topic":"__consumer_offsets","partition":3,"replicas":[2,0,3]},
{"topic":"__consumer_offsets","partition":4,"replicas":[3,2,0]}
]
}
Once executed, the topic is reorganized like this:
$ bin/kafka-topics.sh --zookeeper zkhost:2181 --describe --topic test_topic
Topic:test_topic PartitionCount:5 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,0 Isr: 0,3
Topic: test_topic Partition: 3 Leader: 0 Replicas: 0,3 Isr: 3,0
Topic: test_topic Partition: 4 Leader: 2 Replicas: 2,0 Isr: 0,2
As you may observe in this example, the data movement between nodes for the partitions of the test_topic
is not optimal.
As a result, a hand written assignment may sometimes be preferable over the generated one.
To replace a node by another one, you don’t need to use the above scenarios because you can keep the same partition assignment. All you have to do is:
-
Stop the old node
-
Give the new node the same Id as the old one
-
Start the new node
Rack awareness
Starting with version 0.10.0, Kafka supports rack aware replica placement. It means Kafka will try to place replicas in different racks (or availability zones).
The only change is the broker.rack
property in the broker configuration file:
broker.id=0
broker.rack=A
For instance, imagine brokers 0 and 1 are in rack A, while brokers 2 and 3, are in rack B. Now, let’s create a topic with a replication factor two, each partition has a replica in each rack.
$ bin/kafka-topics.sh --zookeeper zkhost:2181 --create --topic test_topic --partitions 5 --replication-factor 2
Created topic "test_topic".
$ bin/kafka-topics.sh --zookeeper zkhost:2181 --describe --topic test_topic
Topic:test_topic PartitionCount:5 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: test_topic Partition: 1 Leader: 3 Replicas: 3,0 Isr: 3,0
Topic: test_topic Partition: 2 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: test_topic Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: test_topic Partition: 4 Leader: 1 Replicas: 1,2 Isr: 1,2
This feature is really interesting to improve failure tolerance, but it makes the assignment harder to build manually.
Simple scaling
As you have seen it, horizontally scaling a Kafka cluster is not that hard, but it is tedious.
Kafka Manager allows, through its web UI, to visually reassign partitions to nodes.
Running on a highly elastic environment, like a Docker cluster scheduler, seems sensitive. Some solutions exist though:
-
Confluent Enterprise 3.1 contains a feature called Auto data balancing whose purpose is to ease these operations. Unfortunately, it is not open source.
-
Mesos has an integration which seems to be able to make Kafka scaling smoother
Other posts
- 2020-11-28 Build your own CA with Ansible
- 2020-01-16 Retrieving Kafka Lag
- 2020-01-10 Home temperature monitoring
- 2019-12-10 Kafka connect plugin install
- 2019-07-03 Kafka integration tests