Streaming Swiftly: Exploring Fast API and Kafka Integration

Ajith Shetty
4 min readSep 23, 2023

--

Empowering Data Engineers: Harnessing the power of Kafka and Fast API for Seamless Data Processing.

Given the full stack engineers who are well versed with Front end and backend application knowledge, Data Engineers are also expected to know the in depth knowledge from End to End pipeline which consists of getting the data from different sources using API as well.

Here in this blog, we will be specifically concentrate on building APIs.

Now there are many ways you can integrate your application with an external API.

This blog is specifically targeting the FAST API and how we can utilise it to integrate with KAFKA.

Before that let’s see what is FAST API and its features.

What is FAST API

FastAPI is a modern, fast (high-performance), web framework for building APIs with Python 3.7+ based on standard Python type hints.

Source: https://fastapi.tiangolo.com/

Fast API Supports concurrency and coroutines without importing the async packages.

The main advantage of FAST API is, it is easy to build and it provides the interactive docs creation(OPEN API).

Since its built on top of python, you are not expected to learn any new syntaxes apart from python built ins.

ASGI

Fast API uses ASGI- Asynchronous Server Gateway Interface.

ASGI provides the interface between your server and Web Application.

Specifically FAST API uses UVICORN for ASGI.

Now UVICORN, uses lightweight “starlette” ASGI Framework.

DEMO

Clone the project.

git clone https://github.com/ajithshetty/kafka-fast-api-demo

As part of this project we will be launching all the resources as docker containers.

  1. broker: Kafka Broker
  2. zookeeper
  3. kafka-ui: For viewing the data and topic details
  4. api-demo: An Application which will help us to produce and consume Kafka Records using Fast API

Setting up the dependencies

[tool.poetry]
name = "kafka-fast-api-demo"
version = "1.0.0"
description = "Kafka Fast Api DEMO"
authors = ["Ajith Shetty"]

[tool.poetry.dependencies]
python = "^3.10"
fastapi = "^0.92.0"
uvicorn = "^0.20.0"
pydantic = "^1.10.5"
python-dotenv = "^1.0.0"
aiokafka = "^0.8.0"

[tool.poetry.dev-dependencies]

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

If you see the docker-compose.yaml, we are passing the environment variables

- BOOTSTRAP_SERVERS=broker:29092
- TOPIC_NAME=demo-topic
- TOPIC_PARTITIONS=10
- TOPIC_REPLICATION_FACTOR=1

API Methods

In the code we have created multipe API methods

@app.on_event(“startup”)

App on startup, will make sure that we run the set of functions when we initialise our main.

In our case, we are running create topic function.

client.create_topics(new_topics=[topic], validate_only=False)

@app.get(“/”)

Its a test function we have to see we are able to connect to the API which returns “Hello World”

@app.post(“/producer/{msg}”)

Using the post function, we are sending/producing some messages to the given topic.

This POST function specifically calls the KAFKA Producer and pass the “msg” as the value, and timestamp will be passed as the KEY.

@app.get(“/consumer”)

The GET method will print the messages in the topic from the start.

We have defined the KAFKA Consumer function underneath which will connect to the given topic and read the messages.

Execution

Let’s bring up the containers

docker compose up --build

Hit the URL

http://127.0.0.1:8002/docs

Here we can see that there are 3 methods implemented, which we have discussed above.

Let’s try to try our POST, which will create a message into the given topic.

Let’s confirm the same in the Kafka UI

Now we can try out the consumer.

We can see the same data which we have producer earlier.

We can run the same above commands using command line and Web as well.

When our containers are running let’s start with the sample METHOD.

http://127.0.0.1:8002/

This should return:

{"message":"Hello World"}

Now let’s produce some data.

curl -X POST -H "Content-Type: application/json"  http://localhost:8002/producer/kafka-demo-test

It should return

{"status":"success"}

Confirm in the kafka UI

All looks good.

Now we will run the Consumer method, which should consume the message what we just produced.

Hit the URL

http://127.0.0.1:8002/consumer

You should see a response

[{"key":"1693141293","value":"kafka-demo-test"}]

Its the record you have produced.

Conclusion

So we have learnt what is FAST API and its features.

We have created a small project which will help us connect and integrate with KAFKA.

The above code example is stored under:

I would recommend to read the reference pages for more details.

Reference

--

--

Ajith Shetty
Ajith Shetty

Written by Ajith Shetty

Bigdata Engineer — Love for BigData, Analytics, Cloud and Infrastructure. Want to talk more? Ping me in Linked In: https://www.linkedin.com/in/ajshetty28/