Streaming Swiftly: Exploring Fast API and Kafka Integration
Empowering Data Engineers: Harnessing the power of Kafka and Fast API for Seamless Data Processing.
Given the full stack engineers who are well versed with Front end and backend application knowledge, Data Engineers are also expected to know the in depth knowledge from End to End pipeline which consists of getting the data from different sources using API as well.
Here in this blog, we will be specifically concentrate on building APIs.
Now there are many ways you can integrate your application with an external API.
This blog is specifically targeting the FAST API and how we can utilise it to integrate with KAFKA.
Before that let’s see what is FAST API and its features.
What is FAST API
FastAPI is a modern, fast (high-performance), web framework for building APIs with Python 3.7+ based on standard Python type hints.
Source: https://fastapi.tiangolo.com/
Fast API Supports concurrency and coroutines without importing the async packages.
The main advantage of FAST API is, it is easy to build and it provides the interactive docs creation(OPEN API).
Since its built on top of python, you are not expected to learn any new syntaxes apart from python built ins.
ASGI
Fast API uses ASGI- Asynchronous Server Gateway Interface.
ASGI provides the interface between your server and Web Application.
Specifically FAST API uses UVICORN for ASGI.
Now UVICORN, uses lightweight “starlette” ASGI Framework.
DEMO
Clone the project.
git clone https://github.com/ajithshetty/kafka-fast-api-demo
As part of this project we will be launching all the resources as docker containers.
- broker: Kafka Broker
- zookeeper
- kafka-ui: For viewing the data and topic details
- api-demo: An Application which will help us to produce and consume Kafka Records using Fast API
Setting up the dependencies
[tool.poetry]
name = "kafka-fast-api-demo"
version = "1.0.0"
description = "Kafka Fast Api DEMO"
authors = ["Ajith Shetty"]
[tool.poetry.dependencies]
python = "^3.10"
fastapi = "^0.92.0"
uvicorn = "^0.20.0"
pydantic = "^1.10.5"
python-dotenv = "^1.0.0"
aiokafka = "^0.8.0"
[tool.poetry.dev-dependencies]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
If you see the docker-compose.yaml, we are passing the environment variables
- BOOTSTRAP_SERVERS=broker:29092
- TOPIC_NAME=demo-topic
- TOPIC_PARTITIONS=10
- TOPIC_REPLICATION_FACTOR=1
API Methods
In the code we have created multipe API methods
@app.on_event(“startup”)
App on startup, will make sure that we run the set of functions when we initialise our main.
In our case, we are running create topic function.
client.create_topics(new_topics=[topic], validate_only=False)
@app.get(“/”)
Its a test function we have to see we are able to connect to the API which returns “Hello World”
@app.post(“/producer/{msg}”)
Using the post function, we are sending/producing some messages to the given topic.
This POST function specifically calls the KAFKA Producer and pass the “msg” as the value, and timestamp will be passed as the KEY.
@app.get(“/consumer”)
The GET method will print the messages in the topic from the start.
We have defined the KAFKA Consumer function underneath which will connect to the given topic and read the messages.
Execution
Let’s bring up the containers
docker compose up --build
Hit the URL
http://127.0.0.1:8002/docs
Here we can see that there are 3 methods implemented, which we have discussed above.
Let’s try to try our POST, which will create a message into the given topic.
Let’s confirm the same in the Kafka UI
Now we can try out the consumer.
We can see the same data which we have producer earlier.
We can run the same above commands using command line and Web as well.
When our containers are running let’s start with the sample METHOD.
This should return:
{"message":"Hello World"}
Now let’s produce some data.
curl -X POST -H "Content-Type: application/json" http://localhost:8002/producer/kafka-demo-test
It should return
{"status":"success"}
Confirm in the kafka UI
All looks good.
Now we will run the Consumer method, which should consume the message what we just produced.
Hit the URL
http://127.0.0.1:8002/consumer
You should see a response
[{"key":"1693141293","value":"kafka-demo-test"}]
Its the record you have produced.
Conclusion
So we have learnt what is FAST API and its features.
We have created a small project which will help us connect and integrate with KAFKA.
The above code example is stored under:
I would recommend to read the reference pages for more details.