Retrieving Kafka Lag
2020-01-16 java kafka
This article shows how to get Kafka lag for a given consumer group using the Java API.
It’s about implementing part of the kafka-consumer-group
command-line tool in pure Java.
To get consumer lag we will go through several steps:
-
Get consumer group current offset, 4 in the above example
-
Get topic end offset: the producers offset, 8 in the above example
-
Compute the lag: the difference between both
Getting consumer group offset
Kafka 2.0 introduced an AdminClient
class which contains a very useful
listConsumerGroupOffsets method.
This method returns for a given consumer group a dictionary (topic name, partition) → current offset
return adminClient
.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata().get();
Obviously, this solution expects consumer offsets to be stored in Kafka’s __consumer_offsets
topic.
It does not apply, for example, to some Kafka Connect sink implementations which store their lag in the target data store.
The listConsumerGroupOffsets
is asynchronous and returns a KafkaFuture
(some kind promise) which implements Java’s Future
.
My code is blocking, there is room for improvement.
To get consumer group Ids, there is a listConsumerGroups
in the same AdminClient
class:
return adminClient
.listConsumerGroups()
.valid()
.thenApply(r -> r.stream()
.map(ConsumerGroupListing::groupId)
.collect(toList())
).get();
By computing the current offset derivative, we could compute the consumer message rate.
There is another method to get consumer offsets, it is in the consumer client and is named committed.
Contrary to listConsumerGroupOffsets
method, it requires to know the consumed topic partitions.
So it’s useless in our case.
Getting topic end offset
The KafkaConsumer
class contains an
endOffsets method
to get the end offset of a topic partition.
It returns a dictionary (topic name, partition) → end offset
return consumer.endOffsets(partitions);
By computing the end offset derivative, we could compute the producer message rate.
Getting the topic start offset using beginningOffsets method, we also could compute the topic size per partition.
Joining offsets and computing lag
Both consumer offsets and topic end offsets are given per partition. To compute the lag we have to do a join using the topic partition as key.
Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets = getConsumerGroupOffsets(groupId);
Map<TopicPartition, Long> topicEndOffsets = getTopicEndOffsets(groupId, consumerGroupOffsets.keySet());
Map<Object, Object> consumerGroupLag = consumerGroupOffsets.entrySet().stream()
.map(entry -> mapEntry(entry.getKey(), new OffsetAndLag(topicEndOffsets.get(entry.getKey()), entry.getValue().offset())))
As consumer lag is equal to topic end offset - consumer current offset, computing it is straightforward:
long lag = endOffset - currentOffset;
if (lag < 0) {
lag = 0;
}
return lag;
Conclusion
We managed to get consumer lag using the Java Kafka client API and a few lines of code.
However, I regret several things about this API:
-
The
endOffsets
method is not in theAdminClient
class. If it were the case, instantiating a consumer would be useless. -
We have to open two connections, and repeat twice the connection settings like
bootstrap.servers
, once for the admin client, then for the consumer client. It would be interesting if they could share options and maybe even the TCP connection. -
AdminClient
class often returnsKafkaFuture<Something>
, the API design is very different fromConsumer
andProducer
clients. I wonder why they created aKafkaFuture
class instead of reusingCompletableFuture
.
Other posts
- 2020-11-28 Build your own CA with Ansible
- 2020-01-16 Retrieving Kafka Lag
- 2020-01-10 Home temperature monitoring
- 2019-12-10 Kafka connect plugin install
- 2019-07-03 Kafka integration tests