Retrieving Kafka Lag
2020-01-16 java kafka
This article shows how to get Kafka lag for a given consumer group using the Java API.
It’s about implementing part of the kafka-consumer-group command-line tool in pure Java.
To get consumer lag we will go through several steps:
-
Get consumer group current offset, 4 in the above example
-
Get topic end offset: the producers offset, 8 in the above example
-
Compute the lag: the difference between both
Getting consumer group offset
Kafka 2.0 introduced an AdminClient class which contains a very useful
listConsumerGroupOffsets method.
This method returns for a given consumer group a dictionary (topic name, partition) → current offset
return adminClient
.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata().get();
Obviously, this solution expects consumer offsets to be stored in Kafka’s __consumer_offsets topic.
It does not apply, for example, to some Kafka Connect sink implementations which store their lag in the target data store.
The listConsumerGroupOffsets is asynchronous and returns a KafkaFuture (some kind promise) which implements Java’s Future.
My code is blocking, there is room for improvement.
To get consumer group Ids, there is a listConsumerGroups in the same AdminClient class:
return adminClient
.listConsumerGroups()
.valid()
.thenApply(r -> r.stream()
.map(ConsumerGroupListing::groupId)
.collect(toList())
).get();
By computing the current offset derivative, we could compute the consumer message rate.
There is another method to get consumer offsets, it is in the consumer client and is named committed.
Contrary to listConsumerGroupOffsets method, it requires to know the consumed topic partitions.
So it’s useless in our case.
Getting topic end offset
The KafkaConsumer class contains an
endOffsets method
to get the end offset of a topic partition.
It returns a dictionary (topic name, partition) → end offset
return consumer.endOffsets(partitions);
By computing the end offset derivative, we could compute the producer message rate.
Getting the topic start offset using beginningOffsets method, we also could compute the topic size per partition.
Joining offsets and computing lag
Both consumer offsets and topic end offsets are given per partition. To compute the lag we have to do a join using the topic partition as key.
Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets = getConsumerGroupOffsets(groupId);
Map<TopicPartition, Long> topicEndOffsets = getTopicEndOffsets(groupId, consumerGroupOffsets.keySet());
Map<Object, Object> consumerGroupLag = consumerGroupOffsets.entrySet().stream()
.map(entry -> mapEntry(entry.getKey(), new OffsetAndLag(topicEndOffsets.get(entry.getKey()), entry.getValue().offset())))
As consumer lag is equal to topic end offset - consumer current offset, computing it is straightforward:
long lag = endOffset - currentOffset;
if (lag < 0) {
lag = 0;
}
return lag;
Conclusion
We managed to get consumer lag using the Java Kafka client API and a few lines of code.
However, I regret several things about this API:
-
The
endOffsetsmethod is not in theAdminClientclass. If it were the case, instantiating a consumer would be useless. -
We have to open two connections, and repeat twice the connection settings like
bootstrap.servers, once for the admin client, then for the consumer client. It would be interesting if they could share options and maybe even the TCP connection. -
AdminClientclass often returnsKafkaFuture<Something>, the API design is very different fromConsumerandProducerclients. I wonder why they created aKafkaFutureclass instead of reusingCompletableFuture.
Other posts
- 2020-11-28 Build your own CA with Ansible
- 2020-01-16 Retrieving Kafka Lag
- 2020-01-10 Home temperature monitoring
- 2019-12-10 Kafka connect plugin install
- 2019-07-03 Kafka integration tests