[FastAPI] FastAPI 로 Web Single Pattern 구현 예제
지금까지 FastAPI 를 사용하기 위한 개념들을 정리했다. 이제 FastAPI 로 Web Single Pattern 을 구현해보자.
FastAPI 구현
- Iris Dataset 을 대상으로 분류 모델을 만들고, 예측 결과를 반환하는 웹 애플리케이션을 만들어보자.
- Config $\rightarrow$ Database $\rightarrow$ Application(Service) $\rightarrow$ API 의 순서처럼 사용자 입장의 역순으로 개발하는 것이 좋다.
- 아래와 같이 FastAPI 를 활용한 ML 서버를 간단하게 구현해보자.
POST /predict
로 접근하면 예측을 진행한 후 Response 를 데이터베이스에 저장하고PredictResponse
를 반환한다.GET /predict
로 접근하면 데이터베이스에 저장된 모든PredictResponse
를 반환한다.GET /predict/{id}
로 접근하면 id 로 필터링하여 해당 id 에 맞는PredictResponse
를 반환한다.- FastAPI 서버가 띄워질 때 Model 을 Load 한다.
- API 의 경우 구현하면서 각 계층에 맞는 역할을 하도록 매순간 고민하면서 개발한다.
- 필요 라이브러리의 버전 관리는 Poetry 를 이용한다. DB 는 SQLite3 과 ORM 을 위해 SQLAlchemy 를 이용한다.
프로젝트 구조
-
아래와 같이 프로젝트 구조를 잡을 수 있다. 이 포스트에서는 간단한 구현 예제를 다루기 때문에 Layered Architecture 구성, DTO, Error Handling, Middleware, Test 과정 등을 생략한다.
. ├── Dockerfile # 애플리케이션의 Docker Container Image 를 빌드하기 위한 설정 파일 ├── README.md # 프로젝트 개요, 설치 방법, 실행 방법 등을 설명한 문서 ├── app # 실제 애플리케이션 코드가 포함된 디렉토리 │ ├── core # 핵심 인프라 설정 및 전역적인 유틸리티 관리 계층 (Config, DB 등) │ │ ├── config.py # 환경 변수, 설정값 정의 (ex. DB URL) │ │ ├── dependencies.py # 의존성 주입 설정 │ │ ├── db # 데이터베이스 관련 설정 │ │ │ └── session.py # SQLAlchemy 세션 팩토리 및 연결 설정 │ │ ├── lifespan.py # FastAPI lifespan 이벤트 처리 (앱 시작/종료 시 실행할 로직) │ │ └── logger.py # 애플리케이션 로깅 설정 및 초기화 │ ├── main.py # FastAPI 애플리케이션의 엔트리포인트 (app 생성, 라우터 등록) │ └── models # 모델 계층: DB 모델, 스키마, DTO 등 데이터 정의 │ ├── db # ORM 모델 정의. DB 테이블과 매핑되는 클래스들 (SQLAlchemy 모델) │ │ └── database.py │ ├── ml_model # ML 모델 정의 │ │ └── model.py │ └── schemas # Pydantic 기반의 요청/응답 스키마 (API I/O 정의) │ └── schemas.py ├── entrypoint.sh # Docker Container 시작 시 실행되는 셸 스크립트 ├── poetry.lock # Poetry 패키지 버전 잠금 파일 └── pyproject.toml # 프로젝트의 종속성 및 빌드 설정 파일
ML Model
- 먼저 iris dataset 으로 모델을 학습하고, 예측하도록 하자.
-
일반적으로
model.py
dataset.py
,data_loader.py
,train.py
,test.py
,inference.py
등으로 세분화할 수 있지만, 여기서는 간단하게model.py
안에 모든 기능을 모아서 구현해보자.from sklearn.datasets import load_iris from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import train_test_split def get_dataset(): iris = load_iris() X, y = iris.data, iris.target return X, y def get_model(): model = RandomForestClassifier(n_estimators=100, random_state=42) return model def train(model, X_train, y_train): model.fit(X_train, y_train) return model def predict(model, X_test): return model.predict(X_test) def evaluate(model, X_test, y_test): return model.score(X_test, y_test) def save_model(model, model_path: str): import joblib joblib.dump(model, model_path) def load_model(model_path: str): import joblib return joblib.load(model_path) def main(): X, y = get_dataset() X_train, X_test, y_train, y_test = train_test_split(X, y) model = get_model() model = train(model, X_train, y_train) score = evaluate(model, X_test, y_test) print(f"model score: {score}") save_model(model, "model.joblib") if __name__ == "__main__": main()
Config 및 Logger
-
프로젝트의 설정 파일을
config.py
에 저장한다. 여기에는 DB 에 대한 설정 정보와 ML 모델의 경로 등을 작성한다.from pydantic import Field from pydantic_settings import BaseSettings class Config(BaseSettings): db_url: str = Field(default="sqlite:///./db.sqlite3", env="DB_URL") model_path: str = Field(default="model.joblib", env="MODEL_PATH") app_env: str = Field(default="local", env="APP_ENV") config = Config()
-
이후 효율적인 디버깅을 위해
logger.py
를 만들고 Logger 를 반환하도록 한다.import logging def init_logger(): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) formatter = logging.Formatter( format="%(asctime)s:%(levelname)s:%(message)s", datefmt= "%m/%d %I:%M:%S %p" ) logger.setFormatter(formatter) return logger logger = init_logger()
DB 통신 및 Schema
- SQLite3 은 프로덕션 용도가 아닌 가벼운 상황에서 쉽게 사용할 수 있는 데이터베이스다. SQLAlchemy 는 Python ORM(Object Relational Mapping)으로, Database 를 객체로 다룰 수 있다.
- 이는 이전 포스트에서 정리했다.
- 아래와 같이
database.py
를 구현할 수 있다.
import datetime from sqlalchemy import Column, Integer, DateTime, create_engine from sqlalchemy.ext.declarative import declarative_base from core.config import config Base = declarative_base() class PredictionResult(Base): __tablename__ = "predictionresult" id = Column(Integer, primary_key=True, autoincrement=True) result = Column(Integer, nullable=False) created_at = Column(DateTime, default=datetime.datetime.now)
- 이제 Database 객체에서 테이블에 데이터를 저장하고 불러올 수 있다. Session 은 데이터베이스의 연결을 관리하는 방식이다. 예를 들어, 음식점에 가서 나올 때까지를 하나의 Session 으로 표현할 수 있다.
- 하나의 Session 안에서 가게 입장, 주문, 식사를 하는 것처럼, Session 내에서 Method(
POST / GET / PATCH
)에 따라 데이터를 추가, 조회, 수정할 수 있다. -
Transaction 은 Session 내에 일어나는 모든 활동이다. Transaction 이 완료되면 결과가 데이터베이스에 저장된다. 아래와 같이
session.py
를 구현할 수 있다.from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from core.config import config # 엔진 생성 engine = create_engine(config.db_url, echo=True) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
-
이제
schema.py
에서 Request 와 Response 에 대한 Data Schema 를 구성해주자.from typing import List from pydantic import BaseModel class PredictionRequest(BaseModel): features: List[float] class PredictionResponse(BaseModel): id: int result: int
-
마지막으로 FastAPI 에서 재사용 가능한 컴포넌트를 효율적으로 관리하고 코드의 유지보수를 용이하게 해주는 의존성 주입을
dependencies.py
에 구현해보자.from core.db.session import SessionLocal from sqlalchemy.orm import Session def get_db(): db: Session = SessionLocal() try: yield db finally: db.close() model = None def load_model(model_path: str): import joblib global model model = joblib.load(model_path) def get_model(): global model return model
API
-
이제 위 기능들을 합하여 Router 함수를 구현할 수 있다.
from typing import List from fastapi import APIRouter, HTTPException, status, Depends from sqlalchemy.orm import Session from sqlalchemy import select from models.schemas.schemas import PredictionRequest, PredictionResponse from core.dependencies import get_db, get_model from models.db.database import PredictionResult router = APIRouter() @router.post("/predict", response_model=PredictionResponse) def predict( request: PredictionRequest, db: Session = Depends(get_db), model=Depends(get_model) ) -> PredictionResponse: # 예측 수행 prediction = int(model.predict([request.features])[0]) # DB 저장 prediction_result = PredictionResult(result=prediction) db.add(prediction_result) db.commit() db.refresh(prediction_result) return PredictionResponse(id=prediction_result.id, result=prediction) @router.get("/predict", response_model=List[PredictionResponse]) def get_predictions(db: Session = Depends(get_db)) -> list[PredictionResponse]: prediction_results = db.scalars(select(PredictionResult)).all() return [PredictionResponse(id=pr.id, result=pr.result) for pr in prediction_results] @router.get("/predict/{id}", response_model=PredictionResponse) def get_prediction(id: int, db: Session = Depends(get_db)) -> PredictionResponse: prediction_result = db.get(PredictionResult, id) # prediction_result = db.query(PredictionResult).filter(PredictionResult.id == id).first() 와 동일 if not prediction_result: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Prediction not found" ) return PredictionResponse(id=prediction_result.id, result=prediction_result.result)
Lifespan
-
이제 앱이 가동되기 전 실행해야 할 것들을 실행하도록 하는
lifespan.py
를 구현한다.from contextlib import asynccontextmanager from fastapi import FastAPI from core.logger import logger from models.db.database import Base from core.db.session import engine from core.config import config from core.dependencies import load_model @asynccontextmanager async def lifespan(app: FastAPI): # SQLAlchemy 테이블 생성 logger.info("Creating database tables with SQLAlchemy") Base.metadata.create_all(bind=engine) # 모델 로드 logger.info("Loading model") load_model(config.model_path) yield
App 실행
-
마지막으로 아래와 같이 App 을 실행할 수 있다.
from fastapi import FastAPI from core.lifespan import lifespan from api import router import uvicorn app = FastAPI(lifespan=lifespan) # 라우터 등록 app.include_router(router) @app.get("/") def root(): return "Hello World!" if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
Docker
-
아래와 같이 Docker Image 를 띄우기 위해
Dockerfile
을 작성한다. 주의할 점은poetry export -f requirements.txt --without-hashes > requirements.txt
를 통해 requirements 를 만들어줘야 한다.FROM python:3.9.13-slim WORKDIR /code COPY ./requirements.txt /code/requirements.txt RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt COPY . /code CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
더 개선해볼 점
- 이처럼 FastAPI 는 간결하고 빠르게 웹 서비스를 시작할 수 있는 유용한 프레임워크다.
- 위와 같은 기본적인 싱클 패턴에서 더 나아가, 아래와 같이 개선하면서 더 복잡하고 유의미한 가치를 전달할 수 있는 서비스를 만들 수 있다.
Web Server
- FastAPI 서버 앞단에 Node.js 나 Nginx 등을 배치해서 웹 서버를 만들 수 있다.
- 이를 이용하면 정적 파일 서빙, SSL 설정, CORS 처리, 로드 밸런싱, 리버스 프록시 등의 기능을 사용할 수 있다.
- HTML, JS, CSS 등의 프론트엔드 정적 파일을 CDN 으로 서빙함으로써 백엔드 서버의 부하를 줄일 수 있다.
WAS
- 위 구현은 라우터(router)가 없다.
routers/
디렉토리가 빠져 있어, 라우팅 처리가main.py
에 몰릴 수 있다. API 가 늘어난다면app/routers/
디렉토리를 만들어 기능별 라우터 분리를 할 수 있다. - 구현된 router 와 prediction, DB 의 session 에서 비동기 방식을 활용한다. 즉
async def
와AsyncSession
을 적극 활용하여 FastAPI 의 장점인 비동기 I/O 성능을 극대화할 수 있다.- 일반적으로 예측 모델 호출, DB 조회 등 외부 자원에 대한 접근에는 비동기가 필수적이다.
- 간단한 앱에는 적합한 구조지만, 비즈니스 로직이 많아지면
services/
계층을 만들어서 router $\rightarrow$ service $\rightarrow$ repository 흐름을 따르는 layered architecture 로 구성하는 것이 좋다.- Router 에 비즈니스 로직이 섞이게 되면 테스트와 확장성이 떨어진다. 따라서 서비스 계층 도입을 권장한다.
models/schemas/schemas.py
에 모든 Pydantic Schema 를 몰아두게 되면, Schema 수가 많아질 때 관리가 힘들어진다.user.py
,item.py
등 기능별로 분할하는 방식을 고려해볼 수 있다.- DTO 를 도입하면 코드가 깔끔하고 명확해진다. 시스템 간 또는 계층 간 데이터를 전달하기 위한 객체를 두면, 실무에서 API 를 설계할 때 유지보수성과 확장성을 향상시킬 수 있다.
- Request 와 Response 측면에서 중첩되는 Pydantic 은 성능이 좋지 않을 수 있다. Response Model 에만 Pydantic Class 로 정의해주고 실제 반환에서는 ORJSONResponse Class 를 사용해서 직렬화를 통해 반환하는 것이 성능 상 유리하다.
DB Server
- Redis 와 같은 캐시 서버를 활용할 수 있다. 자주 조회되는 예측 결과나 유저 데이터를 Redis 에 캐싱하면 응답 속도를 크게 개선할 수 있다.
- 또한 DB 에 대한 연결을 매번 새로 만드는 대신, Connection Pool 을 이용한다. SQLAlchemy 의 async 기능을 이용하여 엔진을 만들고, 여기에 Connection Pool 관련 인자들을 건네줄 수 있다.
- DB Session 을 만들 때 비동기 task 단위로 Session Scope 를 제한하여, 요청마다 Session 이 동일하게 유지할 수 있도록 한다.
ML Model
- 예측 모델이 자주 바뀌거나 AB Test 가 필요한 경우, MLFlow 를 활용하면 모델을 체계적으로 관리할 수 있다. 또한 롤백이나 추적이 쉬워진다.
- 추가적으로 모델 추론에 Latency 가 있거나 모델 크기가 커지는 경우, FastAPI 내에서 직접 모델을 불러오는 대신에
TorchServe
,TensorFlow Serving
또는NVIDIA Triton
등의 서빙 솔류션을 사용하여 모델 추론 서버를 분리할 수 있다. - 또한 추론 서버 앞에 로드 밸런싱 혹은 메시지 시스템을 두어 서버의 부하를 줄이고 결과 반환 속도를 높여 사용자 경험을 크게 향상시킬 수 있다.
Test 및 Monitoring
pytest
와httpx.AsyncClient
를 활용하여 API 의 정상 동작 여부를 테스트할 수 있다.- Prometheus 와 Grafana 를 활용하여 API 의 Latency, 에러율, 트래픽 등을 시각화하고 알람을 설정하여 장애 대응 속도를 높일 수 있다.
- 구조화된 로그를
ELK
또는Loki
등에 연동하여 실시간 로그 조회 및 분석을 해볼 수 있다.
CI/CD/CT
- Github Actions 를 통해 자동화 파이프라인을 구성한다. 테스트, 빌드, Docker Image 배포를 자동화하여 빠른 배포 사이클을 구축할 수 있다.
- 또한 Airflow 를 활용하여 모델 재학습을 스케줄링한다. 일정 주기로 모델을 재학습하고 자동 배포까지 연결하려면 Airflow DAG 를 구성하는 것이 좋다.
- Airflow 를 사용하면 데이터를 배치 수집하거나, 사용자로부터 수집한 데이터를 처리할 수 있다.
- 추가적으로 지속적인 학습과 예측을 위해 Feast 와 같은 Feature Store 를 도입하여 예측에 사용된 feature 와 학습 feature 간의 일관성을 확보할 수 있다.
댓글 남기기