ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • FastAPI로 GET, POST, DELETE 실행해보기
    Today I Learned 2024. 8. 22. 14:09

    사전 실행

    1. 데이터베이스 생성 했는지 확인
    2. mysql.server start로 mysql 실행했는지 확인
    3. main.py가 있는 곳에서 uvicorn main:app --reload --host 0.0.0.0 실행했는지 확인

    구현 과정

    GET 함수

    @app.get("/users/", response_model=list[User])
    async def get_all_users():
        connection = get_db_connection()
        try:
            with connection.cursor() as cursor:
                # 모든 사용자 조회
                sql = "SELECT * FROM users"
                cursor.execute(sql)
                users = cursor.fetchall()
                # datetime 객체를 문자열로 변환
                for user in users:
                    user['created_at'] = user['created_at'].strftime('%Y-%m-%dT%H:%M:%S') if user['created_at'] else None
    
                return [User(**user) for user in users]
    
        finally:
            connection.close()
    
    @app.get("/users/{user_email}")
    async def read_user(user_email: str):
        connection = get_db_connection()
        try:
            with connection.cursor() as cursor:
                sql = "SELECT * FROM users WHERE email = %s"
                cursor.execute(sql, (user_email,))
                user = cursor.fetchone()
                if user:
                    return user
                else:
                    raise HTTPException(status_code=404, detail="User not found")
        finally:
            connection.close()
    • /users/ : 모든 회원 정보 다 불러오기
    • /users/{user_email}: 회원 정보 찾기

    실행 결과

    POST 함수

    @app.post("/users/", response_model=User)
    async def create_user(user: UserCreate):
        connection = get_db_connection()
        try:
            with connection.cursor() as cursor:
                # 동일한 이메일이 존재하는지 확인
                sql = "SELECT * FROM users WHERE email = %s"
                cursor.execute(sql, (user.email,))
                existing_user = cursor.fetchone()
    
                if existing_user:
                    raise HTTPException(status_code=400, detail="Email already registered.")
    
                # 중복된 이메일이 없으면 새 사용자 생성
                sql = "INSERT INTO users (name, email) VALUES (%s, %s)"
                cursor.execute(sql, (user.name, user.email))
                connection.commit()
    
                # 새로 생성된 사용자를 반환하기 위해 insert_id를 사용하여 조회
                user_id = cursor.lastrowid 
                sql = "SELECT * FROM users WHERE id = %s"
                cursor.execute(sql, (user_id,))
                new_user = cursor.fetchone()
                if new_user:
                    new_user['created_at'] = new_user['created_at'].strftime('%Y-%m-%dT%H:%M:%S') if new_user['created_at'] else None
                    return User(**new_user)
                else:
                    raise HTTPException(status_code=404, detail="User not found")
        finally:
            connection.close()

    id를 기준으로 방금 Insert 한 값을 찾는 이유는,

    • Primary key가 Id로 되어있기 때문에
      • email도 unique 값이긴 하지만, 사용자의 이메일은 변경될 수 있기 때문에 primary key로 id로 지정해놓았다.
        primary key는 고유값으로 한번 생성됐을 당시에 변하지 않기 때문에, 이 id 값을 기준으로 데이터가 잘 들어왔는지 찾는 기능을 준다.
    • cursor.lastrowid 와 같은 마지막 넣은 데이터를 확인가능한 함수가 있다.
      • 내 테이블의 경우 id를 Auto_increment로 해놓았기에 가능한 함수이다.

    💡 가장 중요한건, insert, delete 한 값에 대해 데이터베이스에 반영하기 위해선 connection.commit() 을 해야 확인이 가능하다.

    실행 결과

    DELETE 함수

    @app.delete("/users/{user_email}")
    async def delete_user(user_email: str):
        connection = get_db_connection()
        try:
            with connection.cursor() as cursor:
                sql = "DELETE FROM users WHERE email = %s"
                cursor.execute(sql, (user_email,))
                connection.commit() # 커밋 중요!!
    
                if cursor.rowcount > 0:
                    return f"Successfully deleted {user_email}"
                else:
                    raise HTTPException(status_code=200, detail="User not found")
        finally:
            connection.close()

    실행 결과

Designed by Tistory.