An asynchronous Consumer and Producer API for Kafka with FastAPI in Python

Writing asynchronous code might be a little hard the first time you do it, mainly if you’re trying to do it as you’re working with some other difficult library like Kafka. Although it’s difficult, it’s worth the effort when you’re dealing with a high load or multiple microservices that can take some time to answer your calls.
The purpose of this article is to create an simple asynchronous API that works the same time as a Kafka’s producer and consumer. The full project it’s on my Github.

Setting up Kafka

In this tutorial we’ll be using Docker to set up Kafka following the Shuyi Yang’s tutorial[1]. The docker-compose.yml file used will be: To create it, we can navigate to the folder and write:

OBS: I’m using Windows, so commands might differ in Linux distros or MacOS.
We can now create a topic:

And write some events to it:

Creating the API

We’ll be using the library aiokafka to deal with Kafka and FastAPI to create the API, as in the awesome tutorial by iwpnd[2]. To simplify, the project will only have one python file called main.py. Importing the necessary libraries:

Now, we need to create the FastAPI class and the loop objects. The loop object will be referenced in the creating of the Producer and the Consumer classes: For the consumer part, we can create a consume function which has the purpose of printing the message of the topic (and it’s proprieties) in the server terminal everytime a new one arrives. Now here’s the trick for the function of the consumer runs forever: the consume() function must be created at the startup of the API. With that, we guarantee that the function is running all the time. We can now create the POST route that will produce the message when someone calls it. Here we got two classes to standardize the input. And that’s it for the API. We can start it using uvicorn: We can also test it using cURL: It should return something like this: Result from the API call as the client In the server terminal, the results should look like: Result from the API call in the server That’s it. Feel free to ask any questions or to contribute in the github repository or in my personal email.

References

[1] Shuyi Yang. Apache Kafka: Docker Container and examples in Python. Available at https://towardsdatascience.com/kafka-docker-python-408baf0e1088

[2] iwpnd. Apache Kafka producer and consumer with FastAPI and aiokafka. Available at https://iwpnd.pw/articles/2020-03/apache-kafka-fastapi-geostream