Retrieving Kafka Lag

2020-01-16 java kafka

This article shows how to get Kafka lag for a given consumer group using the Java API. It’s about implementing part of the kafka-consumer-group command-line tool in pure Java.

Consumer Lag

To get consumer lag we will go through several steps:

  1. Get consumer group current offset, 4 in the above example

  2. Get topic end offset: the producers offset, 8 in the above example

  3. Compute the lag: the difference between both

Getting consumer group offset

Kafka 2.0 introduced an AdminClient class which contains a very useful listConsumerGroupOffsets method. This method returns for a given consumer group a dictionary (topic name, partition) → current offset

        return adminClient
                .listConsumerGroupOffsets(groupId)
                .partitionsToOffsetAndMetadata().get();

Obviously, this solution expects consumer offsets to be stored in Kafka’s __consumer_offsets topic. It does not apply, for example, to some Kafka Connect sink implementations which store their lag in the target data store.

The listConsumerGroupOffsets is asynchronous and returns a KafkaFuture (some kind promise) which implements Java’s Future. My code is blocking, there is room for improvement.

To get consumer group Ids, there is a listConsumerGroups in the same AdminClient class:

        return adminClient
                .listConsumerGroups()
                .valid()
                .thenApply(r -> r.stream()
                        .map(ConsumerGroupListing::groupId)
                        .collect(toList())
                ).get();

By computing the current offset derivative, we could compute the consumer message rate.

There is another method to get consumer offsets, it is in the consumer client and is named committed. Contrary to listConsumerGroupOffsets method, it requires to know the consumed topic partitions. So it’s useless in our case.

Getting topic end offset

The KafkaConsumer class contains an endOffsets method to get the end offset of a topic partition. It returns a dictionary (topic name, partition) → end offset

            return consumer.endOffsets(partitions);

By computing the end offset derivative, we could compute the producer message rate.

Getting the topic start offset using beginningOffsets method, we also could compute the topic size per partition.

Joining offsets and computing lag

Both consumer offsets and topic end offsets are given per partition. To compute the lag we have to do a join using the topic partition as key.

            Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets = getConsumerGroupOffsets(groupId);
            Map<TopicPartition, Long> topicEndOffsets = getTopicEndOffsets(groupId, consumerGroupOffsets.keySet());
            Map<Object, Object> consumerGroupLag = consumerGroupOffsets.entrySet().stream()
                    .map(entry -> mapEntry(entry.getKey(), new OffsetAndLag(topicEndOffsets.get(entry.getKey()), entry.getValue().offset())))

As consumer lag is equal to topic end offset - consumer current offset, computing it is straightforward:

            long lag = endOffset - currentOffset;
            if (lag < 0) {
                lag = 0;
            }
            return lag;

Conclusion

We managed to get consumer lag using the Java Kafka client API and a few lines of code.

However, I regret several things about this API:

  1. The endOffsets method is not in the AdminClient class. If it were the case, instantiating a consumer would be useless.

  2. We have to open two connections, and repeat twice the connection settings like bootstrap.servers, once for the admin client, then for the consumer client. It would be interesting if they could share options and maybe even the TCP connection.

  3. AdminClient class often returns KafkaFuture<Something>, the API design is very different from Consumer and Producer clients. I wonder why they created a KafkaFuture class instead of reusing CompletableFuture.