Kafka Java Client

2016-10-10 kafka java

Apache Kafka is trendy software which mixes a message broker and an event log. From the ground up, it’s a distributed solution designed for scalability and performance. It was created by LinkedIn in 2011, it is now open-source and supported by the Confluent company.

For Java developers, until Kafka 0.8, there was only an intricate Scala API with Java bindings. Since 0.9 there is a pure Java API which makes things simpler. In this blog post we will discuss this API.

Creating a Topic

The topic is the place where messages are sent and where they are stored. All messages send to Kafka are written to disk.

A topic is split into multiple partitions. Each partition is usually placed on a different Kafka node. Each partition can be replicated many times in order to tolerate node failures. Among replicas, a leader is elected, it has the privilege of receiving messages first and sending messages to consumers.

A topic is automatically created when the first message arrives. Alternatively, it may be manually created with the kafka-topic.sh command line tool:

$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic newtopic --partitions 5 --replication-factor 2
Created topic "newtopic".
$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic newtopic
Topic:newtopic  PartitionCount:5        ReplicationFactor:2     Configs:
        Topic: newtopic Partition: 0    Leader: 2       Replicas: 2,3   Isr: 2,3
        Topic: newtopic Partition: 1    Leader: 3       Replicas: 3,1   Isr: 3,1
        Topic: newtopic Partition: 2    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: newtopic Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: newtopic Partition: 4    Leader: 3       Replicas: 3,2   Isr: 3,2

In the above example, the topic has five partitions. Partition 0 has two replicas: one on node 2, the other node 3. The replica on node 2 is the leader. You will notice the leader partitions are evenly distributed among this 3 node cluster.

Each partition is an independent log of events. In this log, messages are sorted by their arrival order. Each message is identified by its position in the log, this position is called offset.

Topic

At the time of writing (Kafka 0.10.0), it is not possible to create or delete a Topic with the Kafka Client library. By the way, this should change in the upcoming release (0.10.1). Right now, you’ll have to stick with the forementioned command line tool, or use the Scala library which contains an AdminUtils class.

Message / Record

A message is also called a record in the Kafka vocabulary. It consists of:

  • An optional key: it doesn’t guarantee message uniqueness, multiple messages may have the same key.

  • A value: the body, the main part of the message

  • A timestamp (since Kafka 0.10): when the message was created by the producer or recorded in the broker

  • An offset: a big number describing the position of the message in the log

When the key is provided, it is hashed and this hash is used to determine in which partition it should go. Two messages with the same key will end in the same partition. Messages having the same key can be merged together by an optional background process called compaction.

The key and the value can be of any type: String, long, byte[]…​ This is made possible by serializers and deserializers which are strategies to read and write anything from byte stream. Some (de)serializers are provided for basic types, and there are some third party (de)serializers to handle complex types. For example, it’s possible to exchange plain objects written in JSON or Avro format.

Sending messages

Messages are sent to the Kafka broker using a producer. The producer knows the distribution of topic partitions on nodes, it will hash the record key and send the record directly to the appropriate partition/node.

Producer

When no key is provided, the producer uses a random partition. In short, the producer is able to load balance writes to all partitions, and associated nodes.

This producer is initialized and configured with some properties.

Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092"); (1)
producerConfig.put(ProducerConfig.ACKS_CONFIG, "1"); (5)
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); (4)
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (Producer<String, String> producer = new KafkaProducer<>(producerConfig)) {
    ProducerRecord<String, String> record = new ProducerRecord<>("the_topic", "the_key","The Message"); (2)
    Future<RecordMetadata> futureMetadata = producer.send(record);(3)
    RecordMetadata metadata = producer.get(); (5)
}
1 Connect to these Kafka connect nodes. The first Kafka node to answer will give the full list of nodes.
2 Create the record/message
3 Send the message. By default, this operation is asynchronous and non blocking, it immediately returns a Future
4 The message and the key are written on the wire using the string serializer.
5 Wait for message acknowledgement. The Acks config indicates how many replicas should write the message to disk before returning an acknowledgement to the producer.

In a real world application, the producer should be instanciated only once and reused for the application lifespan.

Receiving messages

Messages are received from the Kafka broker by a consumer. A consumer is a process in charge for reading messages from a topic and dealing with them. As an acknowledgement, the consumer writes the message offset back to the broker, it’s called offset commit.

A consumer group is a set of consumers distributed on multiple machines. For a given topic and group, each partition gets read by a single consumer. This prevents messages from being consumed twice in the consumer group. On the contrary, a consumer can be in charge of several partitions.

Consumer and Consumer Group

The Kafka cluster tells each consumer which partition it should read from. Each consumer takes care of its portion of topic. As a result, consumers can work independently and in parallel, and messages stored in a topic can be load balanced to consumers on many machines. In case of consumer failure, Kafka reassigns the partitions to other consumers of the same group.

Like the producer, the consumer is initialized and configured with some properties.

Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092"); (1)
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfig)) {
    consumer.subscribe(Arrays.asList("the_topic")); (2)
    ConsumerRecords<String, String> records = consumer.poll(1000L); (3)
    for (ConsumerRecord<String, String> record : records) {
        LOGGER.info("Found message {} {}", record.key(), record.value());
    } (4)
}
1 Connect to these Kafka connect nodes. Like the producer, it doesn’t have to be the whole Kafka cluster.
2 Register this application (consumer group) as a consumer for this list of topics. In return, Kafka will assign some partitions to this consumer.
3 Try to pull messages from the broker.
4 Pulled messages are automatically acknowledged.

In the above example, connecting to the broker, subscribing to one or more topics, and being assigned partitions takes time and is usually done once during application start-up. On the contrary, the poll method should be run in loop. It returns a batch of records whose size is controlled by the max.poll.records and max.partition.fetch.bytes settings.

Unlike the producer, the consumer is not thread-safe. In order to consume records in parallel, each thread should have it’s own consumer.

Acknowledging received messages

The message acknowledgement is called offset commit, because Kafka keeps track of the offset of the last consumed message for each topic + partition + consumer group.

In the previous example, the offsets were automatically and periodically committed to the broker. This auto commit is configurable through properties:

consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000L);
try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfig)) {
    consumer.subscribe(Arrays.asList("the_topic"));
    ConsumerRecords<String, String> records = consumer.poll(1000L);
    for (ConsumerRecord<String, String> record : records) {
        LOGGER.info("Found message {} {}", record.key(), record.value());
    }
}

This offset commit can also be manual in order to ensure messages are acknowledged once they have been processed.

consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfig)) {
    consumer.subscribe(Arrays.asList("the_topic"));
    ConsumerRecords<String, String> records = consumer.poll(1000L);
    for (ConsumerRecord<String, String> record : records) {
        LOGGER.info("Found message {} {}", record.key(), record.value());
    }
    consumer.commitSync();
}

This offset can even be moved forward (to skip records) and backward (to replay records):

    consumer.seekToBeginning(consumer.assignment());

Using a framework

You may have noticed that the consumer API is a pull API. In a real application you’ll have to create a consuming loop in separate thread, and build a push API.

The Spring Kafka does all the heavy lifting for you and smoothly integrates Kafka with Spring and Spring Integration:

  • The KafkaTemplate can send messages

  • The KafkaListener can receive message in a push manner

This library makes Kafka usage very similar to ActiveMQ or RabbitMQ.