-
FastAPI로 GET, POST, DELETE 실행해보기Today I Learned 2024. 8. 22. 14:09
사전 실행
- 데이터베이스 생성 했는지 확인
- mysql.server start로 mysql 실행했는지 확인
- main.py가 있는 곳에서 uvicorn main:app --reload --host 0.0.0.0 실행했는지 확인
구현 과정
GET 함수
@app.get("/users/", response_model=list[User]) async def get_all_users(): connection = get_db_connection() try: with connection.cursor() as cursor: # 모든 사용자 조회 sql = "SELECT * FROM users" cursor.execute(sql) users = cursor.fetchall() # datetime 객체를 문자열로 변환 for user in users: user['created_at'] = user['created_at'].strftime('%Y-%m-%dT%H:%M:%S') if user['created_at'] else None return [User(**user) for user in users] finally: connection.close() @app.get("/users/{user_email}") async def read_user(user_email: str): connection = get_db_connection() try: with connection.cursor() as cursor: sql = "SELECT * FROM users WHERE email = %s" cursor.execute(sql, (user_email,)) user = cursor.fetchone() if user: return user else: raise HTTPException(status_code=404, detail="User not found") finally: connection.close()
- /users/ : 모든 회원 정보 다 불러오기
- /users/{user_email}: 회원 정보 찾기
실행 결과
POST 함수
@app.post("/users/", response_model=User) async def create_user(user: UserCreate): connection = get_db_connection() try: with connection.cursor() as cursor: # 동일한 이메일이 존재하는지 확인 sql = "SELECT * FROM users WHERE email = %s" cursor.execute(sql, (user.email,)) existing_user = cursor.fetchone() if existing_user: raise HTTPException(status_code=400, detail="Email already registered.") # 중복된 이메일이 없으면 새 사용자 생성 sql = "INSERT INTO users (name, email) VALUES (%s, %s)" cursor.execute(sql, (user.name, user.email)) connection.commit() # 새로 생성된 사용자를 반환하기 위해 insert_id를 사용하여 조회 user_id = cursor.lastrowid sql = "SELECT * FROM users WHERE id = %s" cursor.execute(sql, (user_id,)) new_user = cursor.fetchone() if new_user: new_user['created_at'] = new_user['created_at'].strftime('%Y-%m-%dT%H:%M:%S') if new_user['created_at'] else None return User(**new_user) else: raise HTTPException(status_code=404, detail="User not found") finally: connection.close()
id를 기준으로 방금 Insert 한 값을 찾는 이유는,
- Primary key가 Id로 되어있기 때문에
- email도 unique 값이긴 하지만, 사용자의 이메일은 변경될 수 있기 때문에 primary key로 id로 지정해놓았다.
primary key는 고유값으로 한번 생성됐을 당시에 변하지 않기 때문에, 이 id 값을 기준으로 데이터가 잘 들어왔는지 찾는 기능을 준다.
- email도 unique 값이긴 하지만, 사용자의 이메일은 변경될 수 있기 때문에 primary key로 id로 지정해놓았다.
- cursor.lastrowid 와 같은 마지막 넣은 데이터를 확인가능한 함수가 있다.
- 내 테이블의 경우 id를 Auto_increment로 해놓았기에 가능한 함수이다.
💡 가장 중요한건, insert, delete 한 값에 대해 데이터베이스에 반영하기 위해선 connection.commit() 을 해야 확인이 가능하다.
실행 결과
DELETE 함수
@app.delete("/users/{user_email}") async def delete_user(user_email: str): connection = get_db_connection() try: with connection.cursor() as cursor: sql = "DELETE FROM users WHERE email = %s" cursor.execute(sql, (user_email,)) connection.commit() # 커밋 중요!! if cursor.rowcount > 0: return f"Successfully deleted {user_email}" else: raise HTTPException(status_code=200, detail="User not found") finally: connection.close()
실행 결과
'Today I Learned' 카테고리의 다른 글
[글또10기] 글또의 6개월 활동 회고 (1) 2025.03.30 FastAPI에 pymysql(DB) 연결하기 (0) 2024.08.21 FastAPI 구현해보기 (0) 2024.08.21 [네이버 부스트캠프 9기 챌린지] 최종 회고 (2) 2024.08.12 [네이버 부스트캠프 챌린지] 3주차 회고 (0) 2024.08.12