An asynchronous Consumer and Producer API for Kafka with FastAPI in Python

June 27, 20225 min read • by Vinicyus Brasil

Create a simple asynchronous API that works the same time as a Kafka's producer and consumer with Python's FastAPI

Writing asynchronous code might be a little hard the first time you do it, mainly if you’re trying to do it as you’re working with some other difficult library like Kafka. Although it’s difficult, it’s worth the effort when you’re dealing with a high load or multiple microservices that can take some time to answer your calls.

The purpose of this article is to create an simple asynchronous API that works the same time as a Kafka’s producer and consumer. The full project it’s on my Github.

Setting up Kafka

In this tutorial we’ll be using Docker to set up Kafka following the Shuyi Yang’s tutorial[1]. The docker-compose.yml file used will be:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

To create it, we can navigate to the folder and write:

docker-compose -f docker-compose.yml up

OBS: I’m using Windows, so commands might differ in Linux distros or MacOS.
We can now create a topic:

docker exec -ti kafka /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test1

And write some events to it:

## Write events
docker exec -ti kafka /opt/kafka/bin/kafka-console-producer.sh --topic test1 --bootstrap-server localhost:9092

## Read events
docker exec -ti kafka /opt/kafka/bin/kafka-console-consumer.sh --topic test1 --from-beginning --bootstrap-server localhost:9092

Creating the API

We’ll be using the library aiokafka to deal with Kafka and FastAPI to create the API, as in the awesome tutorial by iwpnd[2]. To simplify, the project will only have one python file called main.py. Importing the necessary libraries:

import asyncio
import json

from pydantic import BaseModel, StrictStr
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from fastapi import FastAPI

Now, we need to create the FastAPI class and the loop objects.

app = FastAPI()

loop = asyncio.get_event_loop()

KAFKA_INSTANCE = "localhost:29092"

The loop object will be referenced in the creating of the Producer and the Consumer classes:

aioproducer = AIOKafkaProducer(loop=loop, bootstrap_servers=KAFKA_INSTANCE)

consumer = AIOKafkaConsumer("test1", bootstrap_servers=KAFKA_INSTANCE, loop=loop)

For the consumer part, we can create a consume function which has the purpose of printing the message of the topic (and it’s proprieties) in the server terminal everytime a new one arrives.

async def consume():
    await consumer.start()
    try:
        async for msg in consumer:
            print(
                "consumed: ",
                msg.topic,
                msg.partition,
                msg.offset,
                msg.key,
                msg.value,
                msg.timestamp,
            )

    finally:
        await consumer.stop()

Now here’s the trick for the function of the consumer runs forever: the consume() function must be created at the startup of the API. With that, we guarantee that the function is running all the time.

@app.on_event("startup")
async def startup_event():
    await aioproducer.start()
    loop.create_task(consume())


@app.on_event("shutdown")
async def shutdown_event():
    await aioproducer.stop()
    await consumer.stop()

We can now create the POST route that will produce the message when someone calls it. Here we got two classes to standardize the input.

class ProducerResponse(BaseModel):
    name: StrictStr
    message_id: StrictStr
    topic: StrictStr
    timestamp: StrictStr = ""

class ProducerMessage(BaseModel):
    name: StrictStr
    message_id: StrictStr = ""
    timestamp: StrictStr = ""

@app.post("/producer/{topicname}")
async def kafka_produce(msg: ProducerMessage, topicname: str):

    await aioproducer.send(topicname, json.dumps(msg.dict()).encode("ascii"))
    response = ProducerResponse(
        name=msg.name, message_id=msg.message_id, topic=topicname
    )

    return response

And that’s it for the API. We can start it using uvicorn:

uvicorn main:app --reload

We can also test it using cURL:

curl -X POST -d {\"name\":\"salve\"} -H "Content-Type: application/json"  http://localhost:8000/producer/test1

It should return something like this:

In the server terminal, the results should look like:

That’s it. Feel free to ask any questions or to contribute in the github repository or in my personal email.

References

[1] Shuyi Yang. Apache Kafka: Docker Container and examples in Python. Available at https://towardsdatascience.com/kafka-docker-python-408baf0e1088

[2] iwpnd. Apache Kafka producer and consumer with FastAPI and aiokafka. Available at https://iwpnd.pw/articles/2020-03/apache-kafka-fastapi-geostream

RSS feed

Research   •   Deploying a classification model on AWS Lambda with Docker and FastAPI   or   Back to the Homepage