How to Join One-to-Many and Many-to-One Relationship with Kafka Streams

Yet another Kafka feature, which is Kafka Streams, allow us to join two streams into one single stream. Kafka streams provide one-to-many and many-to-one join types.

Sharing is caring!

Overview

In this article, I once again will bring Kafka as my use case. I talked a lot about securing Kafka in my last three blog posts. So, today is going to be a little bit different. I will talk about another feature of Kafka, which is Kafka Streams.

Kafka Streams

Well, if you expect me to discuss transformation operations, such as map, flatmap, and filter; they are not my main today’s focus. What I want to discuss is another feature of Kafka Stream, which is joining streams. More specifically, I will conduct two types of join, in a similar pattern of an RDBMS world.  They are one-to-many (1:N) and many-to-one (N:1) relations.

Of course, while preparing streams before joining, I will need some transformation, such as re-key, group by, and aggregate partition. However, the essential factor today is to join the stream itself.

All the join semantics, such as left join, inner join, outer join and their behavior are beyond the scope of this article. This article is solely focused on KTable inner join. There is a slight difference, especially for the KStream-to-KStream join type. Please visit this link for more details.

Use Case

Let say I have three sources of employee data.  They are Department, Employee, and Employment History. Each employee belongs to one department, and he/she can have multiple employment history. Our goal is to simplify those sources of data into one single employee representation.

Entity-Relationship Diagram

I translate the use case into a more familiar Entity-Relationship Diagram.

Relationship Diagram
Entity-Relationship Diagram
Topology

Here is the topology. The blue color is for input topics and their aggregation. The orange one is for Kafka’s internal topic and/or materialized view. And the green one is for the output topic.

Streans Topology
Kafka Topics

Therefore, for the demo purpose, I created 4 Kafka topics. They are DEPT, EMPLOYEE, EMPLOYMENT-HISTORY, and EMP-RESULT. The first three are input topics. And the last one is an output topic.

Sample Data

Here are the sample data for this demo.

Step-by-Step

JSON (De)Serializer, POJO and Serdes

First, I need to prepare serializer and deserializer for parsing JSON object into POJO. Then, I created five DTOs. Three DTOs for source data, which are EmployeeDto, DepartmentDto, and EmploymentHistoryDto. Next, to handle Employement History aggregation, I need EmploymentHistoryAggregationDto. The last one is EmployeeResultDto; to handle the final streams join result. Lastly, I created Serdes for each DTO.

Many-to-One Relationship: Department and Employee (N:1)

Because all our sample data has no Kafka topic key, therefore at first, we need to select our key first before converting it into KTable.

Department Topology

Employee Topology

Department-Employee Join Topology

One-to-Many Relationship: Employee and Employment History

We already have a KTable, which is a result of a joined stream between Department-Employee. Hence we will join this with the Employment History stream.

Employment History Topology

Employment History has its unique key, which is emp_hist_id. However, we need this set of data in an emp_id perspective. Therefore, we need to pivot the data by selecting emp_id as a key. Then we need to do some data aggregations since there are multiple records within the same emp_id.

Employee-EmploymentHistory Join Topology

Output Topic

Stream Joined Result to Output Topic

Before we send the result into the output topic, we select emp_id as a topic’s key and EmployeeResultDto as a topic’s value.

Sample Result

Future Enhancement

Whenever we are looking at employee output data, it will trigger an enterprise integration pattern inside our minds. Any idea what pattern it is? To answer the question, let us look at all those previous steps so far.

First, we acquired data from multiple streams. Then we identified their correlation. Next, we publish them into a single, aggregated messages to the output channel. Truthfully, at the very basic, it is an aggregator pattern.

Moreover, we can enhance further regarding this aggregation into CQRS architecture. To put things in context, imagine our source of data is multiple command requests.  We stream the data from Kafka topics and sink (Kafka Connect anyone?) the result into another DB that specifically optimized for search. Let say we are using ElasticSearch for querying our aggregated data. Thus we split the READ operation with WRITE operation.

One more interesting fact, whether you realize or not, we already defined a boundary itself, which is Employee Domain. I realized that I am oversimplifying the domain itself. However, my point is, there is a possibility to implement Domain-Driven Design by using Kafka Streams as a groundwork.

Yet it is another topic and already out of scope this blog post. At least, we have already had the foundation grip to go further. For now, enjoy the code in my github repository.

References

Author: ru rocker

I have been a professional software developer since 2004. Java, Python, NodeJS, and Go-lang are my favorite programming languages. I also have an interest in DevOps. I hold professional certifications: SCJP, SCWCD, PSM 1, AWS Solution Architect Associate, and AWS Solution Architect Professional.

13 thoughts on “How to Join One-to-Many and Many-to-One Relationship with Kafka Streams”

  1. Great article. This is one of the best I have seen so far, which put thoughts into more realistic example than some random thoughts. Great work keep it up :>

  2. Thanks for the great article. How do we handle late arrival records in join operation?
    in the above join operation, we will get the duplicate records in the topic.
    Say for example, if one employee has 4 history record. In the output topic, we will have the records like
    1. employee with 1st history record.
    2, employee with first two history records
    3. employee with first three history records
    4. employe with all the history records.

    In this case, only the last record is correct record. How do we restrict the duplicate records?
    Please share your input.

    1. Yes. The order would be just as you mentioned because it is a changelog stream.
      In the end, you will get the correct results.

      As you may know, your case is an example of an event-driven system.
      Therefore, eventual consistency is a common thing to happen in this kind of situation. Or let say, based on the CAP theorem, it is a trade-off when we choose this kind of approach in distributed systems.

      Further references:
      https://docs.confluent.io/current/streams/concepts.html#ktable

    2. You will not get duplicate records, because the approach is to aggregate the employment history. What you need to do, only “update” your target data, with the latest employee record (which already has all the history records), based on its key (EMP_ID).

  3. Thank you for this article. This is helped me a lot.
    1. I am trying to write the employeeResult to a file but its not writing when application is running. I need to stop my java server to see the result in file – result.txt

    empResultTable.toStream()
    .map((key, value) -> new KeyValue(value.getEmpId(), value))
    .print(Printed.toFile(“resul.txt”));

    2. How can I achieve the same with KStream-KStream join (foreign key)

    Thank you!
    Geeta

  4. Thank you for this article. This has helped me a lot. Can you help in resolving the following issues:
    1. I am trying to write the employeeResult to a file but its not writing when application is running. I need to stop my java server to see the result in file – result.txt

    empResultTable.toStream()
    .map((key, value) -> new KeyValue(value.getEmpId(), value))
    .print(Printed.toFile(“resul.txt”));

    2. How can I achieve the same with KStream-KStream join (foreign key)

    Thank you!
    Geeta

    1. I never print to a file from a stream. So I cannot say much about this. But maybe, just maybe, it is because the streams keep your file open in order to write the payload. You need to close the file first to create the file into filesystem. That’s why if you keep your streams running, it will not create a file and it is created once you terminated the streams. Maybe you can check on this more to validate my assumptions.

      And for the KStream-KStream join, you can achieve similarly with KTable. A slight difference is you need to mention the join windows.

      I created another article related KStream-KStream join. See the sample here: https://www.ru-rocker.com/2021/01/03/a-study-case-building-a-simple-credit-card-fraud-detection-system-part-2-mapping-gerkhin-to-kafka-streams/

  5. Thank you for the response.
    I tried as shown below but its not working. Could you please suggest

    public class EmployeeTopology {
    public void createTopology(StreamsBuilder builder) {

    final KStream empStream =
    builder.stream(“EMPLOYEE”,
    Consumed.with(Serdes.Integer(), MySerdesFactory.employeeSerde()))
    .map((key, value) -> new KeyValue(value.getEmpId(), value));

    final KStream deptStream =
    builder.stream(“DEPT”,
    Consumed.with(Serdes.Integer(), MySerdesFactory.departmentSerde()))
    .map((key, value) -> new KeyValue(value.getDeptId(), value));

    KStream resultStream =
    deptStream.join(empStream,
    valueJoiner(),
    JoinWindows.of(Duration.ofSeconds(1)),
    StreamJoined.with(
    Serdes.Integer(),
    MySerdesFactory.departmentSerde(),
    MySerdesFactory.employeeSerde()));
    resultStream.print(Printed.toSysOut());
    }

    private ValueJoiner valueJoiner() {
    return (dept, emp) -> {
    if (dept.getDeptId() == emp.getDeptId()) {
    EmployeeResultDto dto = new EmployeeResultDto();
    dto.setDeptId(emp.getEmpId());
    dto.setEmpName(emp.getEmpName());
    dto.setDeptName(dept.getDeptName());
    return dto;
    }else{
    return null;
    }
    };
    }
    }

    1. KStream-KStream join is working. my bad I had provided – window of 5 seconds.

      2. about writing to file
      empResultTable.toStream()
      .map((key, value) -> new KeyValue(value.getEmpId(), value))
      .print(Printed.toFile(“resul.txt”));
      I didn’t find the way to close this file as Printed.toFile expects file path.

      Thank you for all the support

  6. I am trying to join 2 kstreams using hopping window but join accepts only JoinWindows not the TimeWindows. how can I achieve this?

Leave a Reply

Your email address will not be published. Required fields are marked *