본문 바로가기

BackEnd

FastAPI에서 Websocket을 이용한 채팅 기능 만들기

반응형
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
import logging
import html
import asyncio


app = FastAPI()
templates = Jinja2Templates(directory="templates")
app.mount("/static", StaticFiles(directory="static"), name="static")

# 채팅방 정보를 저장하는 딕셔너리
chat_rooms = {}

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s"))
logger.addHandler(handler)


@app.get("/")
async def index(request: Request):
    return templates.TemplateResponse("index.html", {"request": request})


@app.websocket("/ws/{room_id}")
async def chatroom(websocket: WebSocket, room_id: str):
    # 채팅방 정보가 없으면 새로 생성합니다.
    if room_id not in chat_rooms:
        chat_rooms[room_id] = {
            "connected": set(),
            "messages": [],
        }
    chat_room = chat_rooms[room_id]

    # 클라이언트와 연결되면 connected 세트에 추가합니다.
    await websocket.accept()
    # await websocket.send_json({"event": "handshake"})
    chat_room["connected"].add(websocket)

    try:
        while True:
            # 클라이언트로부터 메시지를 받으면 연결된 모든 클라이언트에게 메시지를 전송합니다.
            message = await websocket.receive_text()
            logger.info(f"Received message: {message}")  # 메시지 출력

            # 입력값을 HTML Escape 처리합니다.
            escaped_message = html.escape(message)

            chat_room["messages"].append(escaped_message)  # 메시지를 채팅방 메시지 리스트에 추가합니다.

            for conn in chat_room["connected"]:
                if conn != websocket:
                    try :
                        await conn.send_text(escaped_message)
                    except WebSocketDisconnect:
                        # 채팅방에서 WebSocket 연결이 끊어졌을 경우
                        chat_room["connected"].remove(conn)
                        # 예외가 발생했으므로 이후 코드를 실행하지 않고 다음 클라이언트로 넘어갑니다.
                        continue


    except WebSocketDisconnect:
        # 클라이언트와 연결이 종료되면 connected 세트에서 제거합니다.
        chat_room["connected"].remove(websocket)

        # # WebSocket 객체를 제거하기 전에 코루틴을 취소합니다.
        # task = asyncio.create_task(chatroom(websocket, room_id))
        # task.cancel()

        # 만약 모든 클라이언트가 나갔다면, 채팅방 정보를 삭제합니다.
        if not chat_room["connected"]:
            del chat_rooms[room_id]

각각의 채팅방에서 채팅 기능을 실행하는 목적으로 제작된 코드이며
채팅방을 리스트로 만드는 기능은 구현되어 있지 않습니다

반응형