<dependency>
<groupId>com.github.charithe</groupId>
<artifactId>kafka-junit</artifactId>
<version>4.1.5</version>
Kafka integration tests
2019-07-03 java kafka
You’re developping a Java application plugged to Kafka, or maybe you’re programming a data processing pipeline based on Kafka Streams. How do you automate tests involving both Java code and Kafka brokers?
Such an integration tests should be able to
-
Start Zookeeper and then Kafka
-
Send messages into Kafka so as to trigger business code
-
Consume messages from Kafka and check their content
-
Stop Kafka and then Zookeeper
Kafka embedded in the test
As both Kafka and Zookeeper are Java applications, it is possible to control them from Java code. It is possible (have a look at camel-kafka or logback-kafka-appender), but is not easy.
There are many libraries to run an embedded Kafka from JUnit without sweating:
-
Kafka JUnit by Charith Ellawala
-
Another Kafka JUnit by Markus Günther
-
Spring Kafka Test by the Spring team
The drawback of this solution is that Kafka and Zookeeper servers are started in the same JVM as your test. So one can fear unexpected behaviour.
Kafka JUnit
Charith’s Kafka JUnit library is one of the most simple and efficient.
This library supports both JUnit 4 & 5.
@ExtendWith(KafkaJunitExtension.class) (1)
@KafkaJunitExtensionConfig(startupMode = StartupMode.WAIT_FOR_STARTUP)
public class CharitheMessageServiceIT {
private static final String TOPIC = "kafka_junit";
@Test
void testSendAndConsume(KafkaHelper kafkaHelper) throws Exception { (2)
String bootstrapServers = kafkaHelper.producerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString();
sendAndConsume(bootstrapServers, TOPIC);
}
1 | Load JUnit 5 extension that will start an embedded Kafka |
2 | A kafkaHelper is injected to get embedded Kafka address |
This KafkaHelper
contains several methods to easily produce and consume messages
ListenableFuture<List<String>> futureMessages = kafkaHelper.consumeStrings(TOPIC, 3); (1)
kafkaHelper.produceStrings(TOPIC, "one", "two", "three"); (2)
List<String> messages = futureMessages.get(5, TimeUnit.SECONDS);
assertThat(messages).contains("one", "two", "three");
1 | Start a non blocking consumer |
2 | Produce some messages in a Topic |
Spring Kafka Test
Spring Kafka Test is an addition to Spring Kafka library.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
This library supports only JUnit 4 at the moment, as a result it contains a JUnit Rule to handle embedded Kafka lifecycle.
public class SpringMessageServiceIT {
private static final String TOPIC = "spring";
@ClassRule (1)
public static EmbeddedKafkaRule kafka = new EmbeddedKafkaRule(1,
false, TOPIC);
@Test
public void testSendAndConsume() throws Exception {
sendAndConsume(kafka.getEmbeddedKafka().getBrokersAsString(), TOPIC); (2)
}
1 | JUnit 4 Rule that will start an embedded Kafka and create a topic. |
2 | The kafka rule is used to get the embedded Kafka address. |
Spring Kafka Test contains a KafkaTestUtils
class which is a swiss army knife to write Kafka related tests.
try(Consumer<Integer, String> consumer = new KafkaConsumer<Integer, String>( (1)
KafkaTestUtils.consumerProps("spring_group", "true", kafka.getEmbeddedKafka()))) {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>( (2)
new DefaultKafkaProducerFactory<>(
KafkaTestUtils.producerProps(kafka.getEmbeddedKafka())));
consumer.subscribe(Collections.singleton(TOPIC));
template.send(TOPIC, "one");
template.send(TOPIC, "two");
ConsumerRecords<Integer, String> records = KafkaTestUtils.getRecords(consumer); (3)
assertThat(records).are(value("one")); (4)
1 | Use KafkaTestUtils to create a consumer. |
2 | Use KafkaTestUtils along with the usual KafkaTemplate to quickly send messages. |
3 | Use KafkaTestUtils to quickly consume messages. |
4 | The KafkaConditions integrates with AssertJ to make received messages simpler. |
Spring Kafka Test is probably the way to go when you’re developping a Spring application. However this library lacks some syntactic sugar to make tests more readable.
Kafka in docker
Test containers purpose is to start Docker containers from JUnit to do integration tests with any product: MySQL, Elasticsearch, Kafka… There is a base module, a Kafka extension and a JUnit 5 extension.
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
Testcontainers library is strongly integrated with JUnit 5, a single annotation and you’re done. A JUnit 4 rule is also available.
@Testcontainers (1)
public class ContainersMessageServiceIT {
private static final String TOPIC = "containers";
@Container (2)
public KafkaContainer kafka = new KafkaContainer("5.2.1");
@Test
public void testSendAndConsume() throws Exception {
sendAndConsume(kafka.getBootstrapServers(), TOPIC);
}
1 | Trigger Testcontainers start |
2 | Create a Kafka container. By default the cp-kafka Docker image created by Confluent is used. As a consequence the version number matches the Confluent Platform version, not Apache Kafka. |
As Testcontainers is a generic library to run containers, there is no helper class to read/write messages. Starting a Docker container is slower than starting an embedded Kafka, but process isolation is stronger. You are starting the real thing, no hacked Kafka broker, so you are closer to production. 3 Dockers containers are actually used by Testcontainers Kafka.
Kafka Consumer subscriptions
Dealing with asynchronous code in tests is often painful, Kafka consumers don’t help.
It can take a long time for the consumer group controller to be elected, and partitions to be assigned. Between the consumer bootstrap and the first messages being received, it can take a second or so.
Using a ConsumerRebalanceListener
to wait for partitions to be assigned and check which ones are assigned can be useful.
The Awaitility library can aleviate the burden of asynchronous testing.
Other posts
- 2020-11-28 Build your own CA with Ansible
- 2020-01-16 Retrieving Kafka Lag
- 2020-01-10 Home temperature monitoring
- 2019-12-10 Kafka connect plugin install
- 2019-07-03 Kafka integration tests