Streams Everywhere — Season 1 Episode 1

Ajith Shetty
7 min readAug 5, 2023

--

Kafka is the essential tool for almost all the data driven companies.

The companies are connecting more and more sources in their platform and want to stream the real time data for N number of use cases.

Use cases like weather reporting, sensor status, real time tracking and many more.

Kafka let’s you bring the data from any producers and lets you manage, store, process and give it to the downstream consumers.

But lately the requirements have changed. And the processes have become more and more complicated.

Kafka stores the data as stream of log messages in the topic.

The main idea is to receive the log messages from the producers and store them until its been requested by the consumers.

But now we need more. We need to filter the log messages, we need to enrich a given set of records, we need to join multiple topics.

The use cases are many where I want to apply a trasnformation or enrichmment to the topic at the real time.

Thus the Kstreams has come into existence.

Before we jump in to the Kstreams let’s refresh our memory on what is Kafka and its components.

What is Apache KAFKA

Kafka Is a event stream platform. It is distributed in nature where the data can be replicated to multiple nodes. Thus providing the High availability of the data.

Kafka Brokers

Storage nodes in Kafka are called and Brokers. It can be a node in the cloud or your local laptop.

The log

The logs is a special kind of file which consist of records. These are always append only and once its stored it becomes immutable.

Topic

Topic is the logical grouping of the streams or records. For example, User demographic, user purchases or user travel.

Retention

we can store the data for a given topic for N number of days or the size. We can tune it based on our application needs.

Producer API

Producer is responsible to get the data into Kafka. Producer sends the message and this data stored in a given offset which is always increasing.

Consumer

Consumer will use the same offset to read the data from Kafka brokers. By this way Consumer keeps track of the messages already read and which are needed to be processed.

Kafka Streams

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology.

https://kafka.apache.org/documentation/streams/

Kafka Streams is a Java library which will sit on top of brokers.

Since its running on top of brokers it gets the capability of parallelisation and distributed procesessing.

KStreams works with the realtime messages which are flowing continuously.

KStreams mainly uses the threads to parallelise and it uses the state store which will enable you to query directly on the streaming data.

KStreams let’s you write less code. KStreams would let you define what to do and not how to do.

Basics operations

KStreams works on Event Streams.

Event streams are basically Sequence of records. Each record is a stand alone events

When we define a KStreams application, we are basically defining our Processor Topology.

This Processor Topology is a DAG consists processor nodes and edges represented as the flowing streams.

Source processor is where the data is coming from.

Based on user logics these nodes are processed and finally forwards to sink processor for any downstream application or some topics.

KStreams Process

In the Above screenshot we can see that multiple consumers are reading the streams parallely.

Based on the Streams each consumer will create a Topology and applies the user defined logics and finally pushes it to downstream streams or topics.

Define a Stream

StreamBuilder builder= new StreamBuilder();
KStream<String,String> myStream=builder.stream(readTopicName,
Consumed.with(Serders.String(),
Serders.String()));

We basically need to create a Stream builder class.

And provide the Topic name to read from.

And Serdes are the type of Serialisation or Deserialisation for the key and value of the stream.

Typical Operation

Filter

We can use filter on the given stream to filter out the records.

Map Values

We can transform the given values

Map

Apply a Map Function

DEMO

Git clone the project.

https://github.com/ajithshetty/spark-kafka-kstreams.git

The reason we have the name of the repo as spark-kafka-kstreams is because it contains many more examples which we will talk in the future blogs.

For now we will use only a part of this repo for our DEMO.

Let’s start with getting the docker setup.

git clone https://github.com/ajithshetty/spark-kafka-kstreams.git
cd spark-kafka-kstreams
sh start_containers.sh

Above script will bring up all the required set of containers.

Note: This repo consists of many examples, so we will bring up multiple containers which might not be relevent for our current example.

You can follow along the steps under:

src/main/java/io/confulent/developer/basic/README.md

Build the Gradle Project

./gradlew build

Run the basic example

./gradlew runStreams -Pargs=basic

We have all the configuration provided in the file src/main/resources/streams.properties

When you run the below processes takes place.

  1. Run the TopicLoader class to create the required topics and fill in few records
src/main/java/io/confluent/developer/basic/TopicLoader.java

2. We read the Input and output topic names from the properties file.

3. Create a stream builder

KStream<String, String> firstStream = builder
.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

4. Apply Peek function to see the records

firstStream.peek((key, value) -> 
System.out.println("Incoming record - key " + key + " value " + value))

5. Apply Filter function to look for a given records

.filter((key, value) -> value.contains(orderNumberStart))
.mapValues(value -> value.substring(value.indexOf("-") + 1))
.filter((key, value) -> Long.parseLong(value) > 1000)

6. Apply Peek function to see the transformed records

.peek((key, value) -> 
System.out.println("Outgoing record - key " + key + " value " + value))

7. Write to another topic

.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()))

The output will look like this

./gradlew runStreams -Pargs=basic

> Configure project :
Using example io.confluent.developer.basic.BasicStreams

> Task :runStreams
Record produced - offset - 0 timestamp - 1691233857191
Record produced - offset - 1 timestamp - 1691233857204
Record produced - offset - 2 timestamp - 1691233857204
Record produced - offset - 3 timestamp - 1691233857204
Record produced - offset - 4 timestamp - 1691233857204
Record produced - offset - 5 timestamp - 1691233857204
Record produced - offset - 6 timestamp - 1691233857204
Incoming record - key order-key value orderNumber-1001
Outgoing record - key order-key value 1001
Incoming record - key order-key value orderNumber-5000
Outgoing record - key order-key value 5000
Incoming record - key order-key value orderNumber-999
Incoming record - key order-key value orderNumber-3330
Outgoing record - key order-key value 3330
Incoming record - key order-key value bogus-1
Incoming record - key order-key value bogus-2
Incoming record - key order-key value orderNumber-8400
Outgoing record - key order-key value 8400
<===========--> 87% EXECUTING [5s]
> :runStreams

With the help of KAFKA-UI we can see that our records are read from input topic and written to output topic

http://localhost:18080/

Input topic details
output topic details

Conclusion

So here we have initiated a discussion on what is KStreams and how we can work with real time streams.

But KStreams are much more what we discussed in here. The use cases are many.

I would strongly suggest you to go through one of the best youtube Playlist from Confluent

DEMO Code: https://github.com/ajithshetty/spark-kafka-kstreams

We have taken the example from the Confluent and modified it based on our requirement.

Confluent examples: https://github.com/confluentinc/learn-kafka-courses

Next steps

In this blog we have discussed about KStreams and its capabilities.

We have a lot more to cover.

In the coming blogs, we will talk about:

  1. KTables
  2. Serialisation and Deserialisation
  3. Joins
  4. Window
  5. Stateful Operations
  6. Kafka-UI

Ajith Shetty

Bigdata Engineer — Bigdata, Analytics, Cloud and Infrastructure.

Subscribe✉️ ||More blogs📝||LinkedIn📊||Profile Page📚||Git Repo👓

Interested in getting the weekly newsletter on the big data analytics around the world, do subscribe to my: Newsletter Just Enough Data

--

--

Ajith Shetty

Bigdata Engineer — Love for BigData, Analytics, Cloud and Infrastructure. Want to talk more? Ping me in Linked In: https://www.linkedin.com/in/ajshetty28/