How to Unit Test Kafka Streams Application – PART 2 (Processor API)

Part 2 of 2 articles to unit test Kafka Streams application. In the second part, I talk about testing Processor API by using MockProcessorContext as well as how to test Processor Scheduler with two types of Punctuator: STREAM_TIME and WALL_CLOCK.

Sharing is caring!

Overview

In the first article, I wrote about how to test Kafka Streams DSL mechanism. Continuing from there, I will talk about how to test Kafka Stream Processor API.

Processor API

Most of the time, you do not need this kind of approach. You will use this API whenever you need a customize processing logic that cannot be handled by standard DSL. Processor API lets you interact with state stores and process one record at a time. Or you can enable the schedule to punctuate the processor via Punctuator callback. And of course, you can combine DSL and Processor API to leverage your streams to handle more complex cases.

Based on my own experience, it is only around 2% of streams implementations will go into Processor API. DSL itself is more than enough for most cases.

Use Case

Let say I have a system to handle the online auction. As you already know, our purpose is to get the highest bid. We received all the data from the online systems then we stream them on our topic called auction-bid-input. The topic key is the item auction, and the value is an input data that has two parameters, which are a username and bid price.

Simple aggregation using DSL can do the trick. But for article purpose, I will use Processor API for this use case.

Auction Processor

I created a custom processor to handle the business logic. The logic is simple. First, if there is no previous auction for a specific item, then insert the new auction with key: item-name and value: AuctionDto into the state store. Next, if there is a new bid and it is higher than previous one, replace the value in the state store. If it is lower, ignore the bid.

Source
public class AuctionProcessor implements Processor<String, AuctionDto> {
private KeyValueStore<String, AuctionDto> kvStore;
private ProcessorContext context;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
this.context = context;
kvStore = (KeyValueStore<String, AuctionDto>) context.getStateStore("action-key-store");
}
@Override
public void process(String key, AuctionDto current) {
final AuctionDto prev = kvStore.get(key);
if(prev == null || prev.getBidPrice().compareTo(current.getBidPrice()) < 0 ) {
kvStore.put(key, current);
context.forward(key, current);
}
}
@Override
public void close() {
}
}
Test

To test the custom logic, we will use MockProcessorContext to capture the process inside our custom processor. We can capture whether we forward the events or not to acknowledge the behavior as expected. And you can register KeyValueStore too into MockProcessorContext to retrieve the specific value after we execute the processor and compare it to determine the result matches our expectations.

To validate this business logic, I created four test cases: Initial bid, Bid is Lower, Bid is Higher, Bid is Equal.

public class AuctionProcessorTest {
private AuctionProcessor processor;
private MockProcessorContext context;
private KeyValueStore<String, AuctionDto> store;
@BeforeEach
public void init() {
processor = new AuctionProcessor();
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "unit-test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MySerdesFactory.auctionSerde().getClass());
context = new MockProcessorContext(config);
store = Stores
.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("action-key-store"),
Serdes.String(), MySerdesFactory.auctionSerde())
.withLoggingDisabled().build();
store.init(context, store);
context.register(store, null);
processor.init(context);
}
@Test
public void testInitialBid_then_ReturnTheBid() {
AuctionDto auctionDto = AuctionDto.builder()
.customerName("Alice")
.bidPrice(20.0)
.build();
processor.process("item-1", auctionDto);
final Iterator<MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator();
// confirmed that processor forwards the first bid
MockProcessorContext.CapturedForward forward = forwarded.next();
assertThat(forward.keyValue()).isEqualTo(new KeyValue<>("item-1", auctionDto));
assertThat(forwarded.hasNext()).isFalse();
// keyStore in processor now contains item-1 with Alice as the bidder with 20.0
assertThat(store.get("item-1")).isEqualTo(auctionDto);
}
@Test
public void testLowerBid_then_ReturnPrevBid() {
AuctionDto existingAuction = AuctionDto.builder()
.customerName("Alice")
.bidPrice(20.0)
.build();
// simulate existing auction
store.put("item-1", existingAuction);
AuctionDto newAuction = AuctionDto.builder()
.customerName("Bob")
.bidPrice(10.0)
.build();
processor.process("item-1", newAuction);
final Iterator<MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator();
// confirmed that processor does not forward anything
assertThat(forwarded.hasNext()).isFalse();
// keyStore in processor contains item-1 with existing
assertThat(store.get("item-1")).isEqualTo(existingAuction);
}
@Test
public void testSameBid_then_ReturnPrevBid() {
AuctionDto existingAuction = AuctionDto.builder()
.customerName("Alice")
.bidPrice(20.0)
.build();
// simulate existing auction
store.put("item-1", existingAuction);
AuctionDto newAuction = AuctionDto.builder()
.customerName("Bob")
.bidPrice(20.0)
.build();
processor.process("item-1", newAuction);
final Iterator<MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator();
// confirmed that processor does not forward anything
assertThat(forwarded.hasNext()).isFalse();
// keyStore in processor contains item-1 with existing
assertThat(store.get("item-1")).isEqualTo(existingAuction);
}
@Test
public void testHigherBid_then_ReturnNewBid() {
AuctionDto existingAuction = AuctionDto.builder()
.customerName("Alice")
.bidPrice(20.0)
.build();
// simulate existing auction
store.put("item-1", existingAuction);
AuctionDto newAuction = AuctionDto.builder()
.customerName("Bob")
.bidPrice(25.0)
.build();
processor.process("item-1", newAuction);
final Iterator<MockProcessorContext.CapturedForward> forwarded = context.forwarded().iterator();
// confirmed that processor forwards the newest bid
MockProcessorContext.CapturedForward forward = forwarded.next();
assertThat(forward.keyValue()).isEqualTo(new KeyValue<>("item-1", newAuction));
assertThat(forwarded.hasNext()).isFalse();
// keyStore in processor contains item-1 with existing
assertThat(store.get("item-1")).isEqualTo(newAuction);
}
}

Please take a look when we build a key-value store. Note that the KV store name should be equal between source and test.

Schedule Processor

Sometimes, we need to schedule our processor and using a Punctuator to forward to the downstream processor (or topics). But using MockProcessorContext cannot simulate the scheduler things. But I know; you want to test your processor so badly, and you are going to have a sleepless night if you do not cover these implementations. So what are you going to do? For instance, we have two Schedule Processor with two types of punctuators, which are stream-time and wall-clock punctuator.

AuctionWithScheduleStreamTimeProcessor
public class AuctionWithScheduleStreamTimeProcessor implements Processor<String, AuctionDto> {
private KeyValueStore<String, AuctionDto> kvStore;
private ProcessorContext context;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
this.context = context;
kvStore = (KeyValueStore<String, AuctionDto>) context.getStateStore("action-key-store-schedule-stream-time");
this.context.schedule(Duration.ofMillis(1000), PunctuationType.STREAM_TIME, (timestamp) -> {
final KeyValueIterator<String, AuctionDto> iter = this.kvStore.all();
while (iter.hasNext()) {
final KeyValue<String, AuctionDto> entry = iter.next();
context.forward(entry.key, entry.value);
}
iter.close();
// commit the current processing progress
context.commit();
});
}
@Override
public void process(String key, AuctionDto current) {
final AuctionDto prev = kvStore.get(key);
if(prev == null || prev.getBidPrice().compareTo(current.getBidPrice()) < 0 ) {
kvStore.put(key, current);
}
}
@Override
public void close() {
}
}
AuctionWithScheduleWallClockProcessor
public class AuctionWithScheduleWallClockProcessor implements Processor<String, AuctionDto> {
private KeyValueStore<String, AuctionDto> kvStore;
private ProcessorContext context;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
this.context = context;
kvStore = (KeyValueStore<String, AuctionDto>) context.getStateStore("action-key-store-schedule-wall-clock");
this.context.schedule(Duration.ofMillis(1000), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
final KeyValueIterator<String, AuctionDto> iter = this.kvStore.all();
while (iter.hasNext()) {
final KeyValue<String, AuctionDto> entry = iter.next();
context.forward(entry.key, entry.value);
}
iter.close();
// commit the current processing progress
context.commit();
});
}
@Override
public void process(String key, AuctionDto current) {
final AuctionDto prev = kvStore.get(key);
if(prev == null || prev.getBidPrice().compareTo(current.getBidPrice()) < 0 ) {
kvStore.put(key, current);
}
}
@Override
public void close() {
}
}

Please take a note that we set the scheduler to run every 1 second (line 11).

Yes, the answer is to combine with the DSL then test the processor from there. You can simulate the Punctuator by using event time in TestInputTopic or advancing the wall clock in the TopologyTestDriver.

Auction Topology

As I stated above, we can combine DSL with Processor API. So we will use the same approach as the first article, but with some minor modifications. We will play with the timestamp to trigger the scheduler.

Source
public void createCustomProcessor(final Topology topology) {
final StoreBuilder<KeyValueStore<String, AuctionDto>> auctionKeyStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("action-key-store"), Serdes.String(),
MySerdesFactory.auctionSerde());
final StoreBuilder<KeyValueStore<String, AuctionDto>> auctionWallClockKeyStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("action-key-store-schedule-wall-clock"), Serdes.String(),
MySerdesFactory.auctionSerde());
final StoreBuilder<KeyValueStore<String, AuctionDto>> auctionStreamTimeKeyStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("action-key-store-schedule-stream-time"), Serdes.String(),
MySerdesFactory.auctionSerde());
topology.addSource("source", Serdes.String().deserializer(),
MySerdesFactory.auctionSerde().deserializer(), "auction-bid-input");
// topology for standard auction processor
topology.addProcessor("proc", AuctionProcessor::new, "source");
topology.addStateStore(auctionKeyStore, "proc");
topology.addSink("sink", "auction-bid-output", "proc");
// topology for stream time punctuator auction processor
topology.addProcessor("proc-stream-time", AuctionWithScheduleStreamTimeProcessor::new, "source");
topology.addStateStore(auctionStreamTimeKeyStore, "proc-stream-time");
topology.addSink("sink-stream-time", "auction-bid-stream-time-output", "proc-stream-time");
// topology for wall clock punctuator auction processor
topology.addProcessor("proc-wall-clock", AuctionWithScheduleWallClockProcessor::new, "source");
topology.addStateStore(auctionWallClockKeyStore, "proc-wall-clock");
topology.addSink("sink-wall-clock", "auction-bid-wall-clock-output", "proc-wall-clock");
}
Stream-Time Punctuator Test

For stream-time punctuator testing, what we need to do is pipe the data with an increasing timestamp to 1 second. We have done this before in the previous article when we are windowing the word-count streams application. Please remember, stream time is only advancing if the topic retrieves new data. Therefore, we need to increase the timestamp while piping into input topics to trigger the scheduler.

// AuctionTopologyTest.java
@Test
@DisplayName("Test multiple auction with stream time")
public void testAuctionWithStreamTime() {
AuctionDto alice = AuctionDto.builder()
.customerName("Alice")
.bidPrice(10.0)
.build();
AuctionDto bob = AuctionDto.builder()
.customerName("Bob")
.bidPrice(9.5)
.build();
AuctionDto charlie = AuctionDto.builder()
.customerName("Charlie")
.bidPrice(10.5)
.build();
AuctionDto dave = AuctionDto.builder()
.customerName("Dave")
.bidPrice(1.5)
.build();
Instant now = Instant.now();
inputTopic.pipeInput("item-1", alice, now);
inputTopic.pipeInput("item-1", bob, now.plus(1, ChronoUnit.SECONDS));
inputTopic.pipeInput("item-1", charlie, now.plus(2, ChronoUnit.SECONDS));
inputTopic.pipeInput("item-2", dave, now.plus(3, ChronoUnit.SECONDS));
Map<String, AuctionDto> expected = Map.of(
"item-1", charlie,
"item-2", dave
);
// make sure the output is not empty
assertThat(outputStreamTimeTopic.isEmpty()).isFalse();
Map<String, AuctionDto> result = new HashMap<>();
while(!outputStreamTimeTopic.isEmpty()){
final KeyValue<String, AuctionDto> kv = outputStreamTimeTopic.readKeyValue();
result.put(kv.key, kv.value);
}
assertThat(result).containsExactlyInAnyOrderEntriesOf(expected);
}

Note: see line 26-30 to advance stream time.

Wall-clock Punctuator Test

In this type of punctuator, the trigger is purely by the wall-clock time. The increasing timestamp will not be useful to simulate the trigger of the scheduler. Therefore we need to use different techniques to simulate the test data. For this kind of situation, we will use the advancing wall clock time approach in TopologyTestDriver class.

// AuctionTopologyTest.java
@Test
@DisplayName("Test multiple auction with wall clock")
public void testAuctionWithWallClock() {
AuctionDto alice = AuctionDto.builder()
.customerName("Alice")
.bidPrice(10.0)
.build();
AuctionDto bob = AuctionDto.builder()
.customerName("Bob")
.bidPrice(9.5)
.build();
AuctionDto charlie = AuctionDto.builder()
.customerName("Charlie")
.bidPrice(10.5)
.build();
AuctionDto dave = AuctionDto.builder()
.customerName("Dave")
.bidPrice(1.5)
.build();
inputTopic.pipeInput("item-1", alice);
inputTopic.pipeInput("item-1", bob);
inputTopic.pipeInput("item-1", charlie);
inputTopic.pipeInput("item-2", dave);
Map<String, AuctionDto> expected = Map.of(
"item-1", charlie,
"item-2", dave
);
// output topic should be empty because the wall clock time still not advancing
assertThat(outputWallClockTopic.isEmpty()).isTrue();
// advancing wall clock time
testDriver.advanceWallClockTime(Duration.ofMillis(1000));
// make sure the output is not empty
assertThat(outputWallClockTopic.isEmpty()).isFalse();
Map<String, AuctionDto> result = new HashMap<>();
while(!outputWallClockTopic.isEmpty()){
final KeyValue<String, AuctionDto> kv = outputWallClockTopic.readKeyValue();
result.put(kv.key, kv.value);
}
assertThat(result).containsExactlyInAnyOrderEntriesOf(expected);
}

Note: see line 40 to advance wall clock.

We already covered the testing part of the schedule processor. As usual, based on my experience, these three are sufficient to handle almost all the Processor API use cases.

Conclusion

Well, I reached the end of two parts articles for testing Kafka Streams. I have covered the DSL and Processor API parts. I hope with this article, we can increase our test coverage as well as make the unit testing creation is becoming our habit. You can see the full complete sample on my github repository.

References

Author: ru rocker

I have been a professional software developer since 2004. Java, Python, NodeJS, and Go-lang are my favorite programming languages. I also have an interest in DevOps. I hold professional certifications: SCJP, SCWCD, PSM 1, AWS Solution Architect Associate, and AWS Solution Architect Professional.

Leave a Reply

Your email address will not be published. Required fields are marked *