Home - hemanth22/fastapi-kafka-receiver GitHub Wiki

Welcome to the fastapi-kafka-receiver wiki!

Test code

from fastapi import FastAPI, HTTPException
from kafka import KafkaConsumer
from pydantic import BaseModel
import psycopg2
from psycopg2 import sql
import os
import json
import requests
import six
import sys
if sys.version_info >= (3, 12, 0):
    sys.modules['kafka.vendor.six.moves'] = six.moves
from typing import List

# Define FastAPI app
app = FastAPI()

# Define the request model
class Reminder(BaseModel):
    message_date: str  # Format: DD-MM-YYYY
    message: str

# PostgreSQL connection details
DB_HOST = os.environ.get('postgres_hostname')
DB_NAME = os.environ.get('postgres_database')
DB_PORT = os.environ.get('postgres_port')
DB_USER = os.environ.get('postgres_username')
DB_PASSWORD = os.environ.get('postgres_password')
FASTAPI_WEBHOOK_SERVERLESS = os.environ.get('FASTAPI_WEBHOOK_SERVER')
bootstrap_servers_sv1 = os.environ.get('BOOTSTRAP_SERVER_NAME')
sasl_mechanism_sv1 = os.environ.get('SASL_MECH')
security_protocol_sv1 = os.environ.get('SSL_SEC')
sasl_plain_username_sv1 = os.environ.get('SASL_USERNAME')
sasl_plain_password_sv1 = os.environ.get('SASL_PASSD')

def serialize_data(data):
    """
    Serialize the given data to a JSON string.

    Args:
        data (dict): The data to serialize

    Returns:
        str: The serialized JSON string
    """
    return json.dumps(data)

#def send_dataToFastAPI(payload):
#    headers = {
#        'Content-Type': 'application/json',
#        }
#    response = requests.request("POST", url=FASTAPI_WEBHOOK_SERVERLESS, headers=headers, data=payload)
#    print(response.text)
#    if response.status_code == 200:
#        print("Data sent successfully")

#    if response.status_code != 200:
#        print(f"Failed to send data. Status code: {response.status_code}")

topic_name = 'insert_to_database'

consumer = KafkaConsumer(
  topic_name,
  bootstrap_servers=bootstrap_servers_sv1,
  sasl_mechanism=sasl_mechanism_sv1,
  security_protocol=security_protocol_sv1,
  sasl_plain_username=sasl_plain_username_sv1,
  sasl_plain_password=sasl_plain_password_sv1,
  group_id='$GROUP_NAME',
  auto_offset_reset='earliest',
  consumer_timeout_ms=60000,
  value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)

def add_reminder(reminder: Reminder):
    try:
        # Connect to the PostgreSQL database
        connection = psycopg2.connect(
            host=DB_HOST, 
            database=DB_NAME,
            user=DB_USER, 
            password=DB_PASSWORD,
            port=DB_PORT
        )
        cursor = connection.cursor()
        
        # Insert the reminder into the database
        insert_query = """
            INSERT INTO remainder_messages (message_date, message) 
            VALUES (TO_DATE(%s, 'DD-MM-YYYY'), %s)
        """
        cursor.execute(insert_query, (reminder.message_date, reminder.message))
        
        # Commit the transaction
        connection.commit()
        return {"status": "success", "detail": "Reminder added successfully"}
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
    
    finally:
        # Close the connection
        if cursor:
            cursor.close()
        if connection:
            connection.close()


@app.get("/messages", response_model=List[dict])
async def get_messages():
    messages = []
    for message in consumer:
        json_data = message.value
        print("Serialized json message: ", serialize_data(json_data))
        json_data_serialized = serialize_data(json_data)
        print(f"Received raw json message: {json_data}")
        add_reminder(json_data_serialized)
        messages.append(json_data)
    return messages

@app.on_event("shutdown")
def shutdown_event():
    consumer.close()