How to Unit Test Kafka Streams Application – PART 1 (DSL)

This is part 1 of 2 articles to unit test Kafka Streams application. The first part talks about testing DSL transformation, stateless and stateful, including joining and windowing.

Sharing is caring!

Overview

Unit testing. Test automation. How many times you hear about it every day. It is so beneficial and becoming so fundamental for every application that we build; we must provide them as our definition of done. Of course, the same rules apply when we build our Kafka Streams applications. We need to equip our applications with robust testing.

Unit Test Kafka Streams

Kafka Streams comes with a test-utils package, which provides TopologyTestDriver to assemble and simulate the topology, TestInputTopic to pipe some data inputs, and TestOutputTopic to produce the outputs by traversing the topology. In my experience, those three are sufficient enough to do test automation throughout the streams. But, who knows about the future complexity? Or maybe my streams are not complex enough yet to oversee the obstacles with the test-utils package?

Well, as always, it is good to get basic knowledge and have a strong foundation. We can widen our skills more easily later if we grasp the essential technique to test the streams. So, let’s start.

I used the existing repository from the previous article and starting from there to implement the testing part.

Adding Test Libraries

For the unit test, I will use JUnit 5, AssertJ (I love this lib), and of course, test-utils from Kafka.

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>2.5.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.18.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.6.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>5.6.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.6.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
<version>1.6.2</version>
<scope>test</scope>
</dependency>
view raw pom.xml hosted with ❤ by GitHub

Word Count Streams

I will start with the very famous one for Kafka Streams. The word count app. It is as well-known as Hello World application.

Source
public void createTopology(StreamsBuilder builder) {
final KStream<String, String> textLines = builder
.stream("streams-plaintext-input",
Consumed.with(Serdes.String(), Serdes.String()));
textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.as("WordCount"))
.toStream()
.to("streams-wordcount-output",
Produced.with(Serdes.String(), Serdes.Long()));
}
Test

For the testing part. I will start by declaring the test class structure. (I show once here, the rest will be only the test method). First, we will set up the init() method to be called for each test invocation. This init() method mainly to set up the topology, configure the input and output topic with topic names that are exactly as same as the source class. Of course, we have to set up the serializer and deserializer as well for each topic.

// WordCountTopologyTest.java
private TopologyTestDriver testDriver;
private TestInputTopic<String, String> plainTextInput;
private TestOutputTopic<String, Long> wordCountOutput;
private final Serde<String> stringSerde = new Serdes.StringSerde();
private final Serde<Long> longSerde = new Serdes.LongSerde();
@BeforeEach
public void init() {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
WordCountTopology wordCountTopology = new WordCountTopology();
final StreamsBuilder builder = new StreamsBuilder();
wordCountTopology.createTopology(builder);
final Topology topology = builder.build();
testDriver = new TopologyTestDriver(topology, props);
plainTextInput = testDriver.createInputTopic("streams-plaintext-input", stringSerde.serializer(),
stringSerde.serializer());
wordCountOutput = testDriver.createOutputTopic("streams-wordcount-output", stringSerde.deserializer(),
longSerde.deserializer());
}
@AfterEach
public void tearDown() throws IOException {
testDriver.close();
FileUtils.deleteDirectory(new File("/tmp/kafka-streams/app-id"));
}

And for the test, we will pipe three lines of text, then count the occurrence and put the result into a Map. By storing the output into the Map it makes sense because we only need to know the latest state of each word and its occurrence.

// WordCountTopologyTest.java
@Test
@DisplayName("Test word count streams")
public void testWordCountStream() {
String text1 = "Welcome to kafka streams";
String text2 = "Kafka streams is great";
String text3 = "Welcome back";
// expected output
Map<String,Long> expected = Map.of(
"welcome", 2L,
"to", 1L,
"kafka", 2L,
"streams", 2L,
"is", 1L,
"great", 1L,
"back", 1L
);
plainTextInput.pipeInput(null,text1);
plainTextInput.pipeInput(null,text2);
plainTextInput.pipeInput(null,text3);
assertThat(wordCountOutput.isEmpty()).isFalse();
// result
Map<String, Long> result = new HashMap<>();
while(!wordCountOutput.isEmpty()) {
final KeyValue<String, Long> kv = wordCountOutput.readKeyValue();
result.put(kv.key, kv.value);
}
assertThat(result).containsExactlyInAnyOrderEntriesOf(expected);
assertThat(wordCountOutput.isEmpty()).isTrue();
}

Alright, just a piece of cake, isn’t it? Let’s move to another use case.

Word Count Streams with Windowing

Now let us add some spice a little bit with our word-count streams. We will group-by the stream, but we will do the operation by windowing the stream every 5 minutes. And for this case, I choose the tumbling time windows as a window mechanism. Therefore the counter will be reset every 5 minutes.

There are many types of windowing in Kafka Streams. As usual, for more information, please visit https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#windowing

Source
//WordCountTimeWindowsTopology.java -- see at line 10
public void createTopology(StreamsBuilder builder) {
final KStream<String, String> textLines = builder
.stream("streams-plaintext-input",
Consumed.with(Serdes.String(), Serdes.String()));
textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(5)))
.count(Materialized.as("WordCount"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(),value))
.to("streams-wordcount-output",
Produced.with(Serdes.String(), Serdes.Long()));
}

Note: see line 10 on how to configure the windows.

Test

For this part, the test will be more or less likely similar to the previous one with a little modification. We will pipe the third text 5 minutes after the first two. But it will not make any sense for our testing to put some delay until 5 minutes. Luckily, TestInputTopic provides the mechanism to manipulate the timestamp by putting Duration while piping the input data.

// WordCountTimeWindowsTopologyTest.java - see at line 29
@Test
@DisplayName("Test word count streams")
public void testWordCountStream() {
String text1 = "Welcome to kafka streams";
String text2 = "Kafka streams is great";
String text3 = "Welcome back";
// expected output
Map<String,Long> expected = Map.of(
// please take note, now the welcome is only 1.
// Because the second welcome word, come after 5 minutes duration.
"welcome", 1L,
"to", 1L,
"kafka", 2L,
"streams", 2L,
"is", 1L,
"great", 1L,
"back", 1L
);
final Instant now = Instant.now();
// insert two lines with the same timestamp
plainTextInput.pipeInput(null,text1, now);
plainTextInput.pipeInput(null,text2, now);
// simulate 5 minutes after the first two
plainTextInput.pipeInput(null,text3, now.plus(5, ChronoUnit.MINUTES));
assertThat(wordCountOutput.isEmpty()).isFalse();
// result
Map<String, Long> result = new HashMap<>();
while(!wordCountOutput.isEmpty()) {
final KeyValue<String, Long> kv = wordCountOutput.readKeyValue();
result.put(kv.key, kv.value);
}
assertThat(result).containsExactlyInAnyOrderEntriesOf(expected);
assertThat(wordCountOutput.isEmpty()).isTrue();
}

Note: I expect the welcome to only have value one instead of two because the stream will be reset every 5 minutes. See line 29 on how to advance the time-windows.

Another piece of cake, isn’t it? Thanks to TestInputTopic to enable us to manipulate the timestamp.

Join Streams

When you are working with streams, at some point, there is a big possibility that you will need to join your streams. Therefore, I came with this sub-topic for testing streams join. For the use case, I will use the existing streams from my previous article, which is the Employee streams.

Source

I will not show the source snippet here, but if you want to see how it is, please visit the article. I might update a slight modification here while joining with Employment History streams. Instead of an inner join, I used left join because I realized what if the employee is a fresh-grad and has no employment history records?

Test

The testing strategy for join-streams is not much different than the previous ones. The same methods are applied. We need to provide the inputs and read from the output. The difference is only the number of input topics. Since we use Employee topology, which has three input topics (EMPLOYEE, DEPT, and EMPLOYMENT-HISTORY), therefore we need to provide three TestInputTopic variables for each of them.

I created two test cases. One is to test employee topology without Employment History. We can expect an output here because of the nature of left-join streams; if there is no data on the right side, the streams will continue to proceed. And the other one is to test employee topology with Employment History.

// EmployeeTopologyTest.java
@Test
@DisplayName("Test Employee Topology between department and employee, exclude employment history")
public void testEmployeeAggregationTopologyWithoutEmploymentHistory() {
// Finance Department
DepartmentDto financeDept = DepartmentDto.builder()
.deptId(1)
.deptName("Finance")
.build();
// Employee: Alice
EmployeeDto alice = EmployeeDto.builder()
.empId(1000)
.empName("Alice")
.deptId(1)
.build();
// expected output
EmployeeResultDto expected = new EmployeeResultDto();
expected.setDeptId(1);
expected.setDeptName("Finance");
expected.setEmpId(1000);
expected.setEmpName("Alice");
// 1.1 insert finance dept to DEPT topic.
// Remember: I put key as null value because we do key repartitioning in deptTable.
// But this depends on your use case.
deptInput.pipeInput(null, financeDept);
// 1.2 output topic (EMP-RESULT) is empty because inner join behaviour between employee and dept
assertThat(employeeOutput.isEmpty()).isTrue();
// 2.1 insert employee to EMPLOYEE topic.
// Remember: I put key as null value because we do key repartitioning in empTable.
// But this depends on your use case.
employeeInput.pipeInput(null, alice);
// 2.2 output topic (EMP-RESULT) now is not empty because there are two stream data with associated key (dept_id)
assertThat(employeeOutput.isEmpty()).isFalse();
assertThat(employeeOutput.readKeyValue()).isEqualTo(new KeyValue<>(1000, expected));
// 2.3 make sure no record left in the output topic
assertThat(employeeOutput.isEmpty()).isTrue();
}
@Test
@DisplayName("Test Employee Topology between department and employee, include employment history")
public void testEmployeeAggregationTopologyWithEmploymentHistory() {
// Finance Department
DepartmentDto financeDept = DepartmentDto.builder()
.deptId(1)
.deptName("Finance")
.build();
// Employee: Alice
EmployeeDto alice = EmployeeDto.builder()
.empId(1000)
.empName("Alice")
.deptId(1)
.build();
// History: Company A
EmploymentHistoryDto historyCompanyA = EmploymentHistoryDto.builder()
.empHistId(1)
.empId(1000)
.employerName("Company A")
.build();
// History: Company B
EmploymentHistoryDto historyCompanyB = EmploymentHistoryDto.builder()
.empHistId(1)
.empId(1000)
.employerName("Company B")
.build();
// expected output
EmployeeResultDto expected = new EmployeeResultDto();
expected.setDeptId(1);
expected.setDeptName("Finance");
expected.setEmpId(1000);
expected.setEmpName("Alice");
expected.setEmploymentHistory(Set.of("Company A", "Company B"));
// 1. insert finance dept to DEPT topic.
// Remember: I put key as null value because we do key repartitioning in deptTable.
// But this depends on your use case.
deptInput.pipeInput(null, financeDept);
// 2. insert employee to EMPLOYEE topic.
// Remember: I put key as null value because we do key repartitioning in empTable.
// But this depends on your use case.
employeeInput.pipeInput(null, alice);
// 3. insert employee to EMPLOYMENT-HISTORY topic.
// Remember: I put key as null value because we do key repartitioning in empTable.
// But this depends on your use case.
employmentHistoryInput.pipeInput(null, historyCompanyA);
employmentHistoryInput.pipeInput(null, historyCompanyB);
// make sure topic is not empty
assertThat(employeeOutput.isEmpty()).isFalse();
// loop until last records, because we cannot predict the left-join behaviour.
// what we now is only the last record should be as what we expected.
KeyValue<Integer, EmployeeResultDto> kv = null;
while(!employeeOutput.isEmpty()) {
kv = employeeOutput.readKeyValue();
}
// make sure kv is not null
assertThat(kv).isNotNull();
assertThat(kv).isEqualTo(new KeyValue<>(1000, expected));
// make sure no record left in the output topic
assertThat(employeeOutput.isEmpty()).isTrue();
}

To be Continue..

I finished the DSL part until the join use case. There are still lots of DSL types for Kafka Streams, but in my experience, the tricks are always similar to these previous three cases. And there is another type of Kafka Streams, which is Processor API.

I will discuss how to test about Processor API in the part 2.

Click here to continue to test Processor API in PART 2

Author: ru rocker

I have been a professional software developer since 2004. Java, Python, NodeJS, and Go-lang are my favorite programming languages. I also have an interest in DevOps. I hold professional certifications: SCJP, SCWCD, PSM 1, AWS Solution Architect Associate, and AWS Solution Architect Professional.

One thought on “How to Unit Test Kafka Streams Application – PART 1 (DSL)”

Leave a Reply

Your email address will not be published. Required fields are marked *