[FastAPI] FastAPI 로 Web Single Pattern 구현 예제
지금까지 FastAPI 를 사용하기 위한 개념들을 정리했다. 이제 FastAPI 로 Web Single Pattern 을 구현해보자.
FastAPI 구현
- Iris Dataset 을 대상으로 분류 모델을 만들고, 예측 결과를 반환하는 웹 애플리케이션을 만들어보자.
- Config $\rightarrow$ Database $\rightarrow$ Application(Service) $\rightarrow$ API 의 순서처럼 사용자 입장의 역순으로 개발하는 것이 좋다.
- 아래와 같이 FastAPI 를 활용한 ML 서버를 간단하게 구현해보자.
POST /predict로 접근하면 예측을 진행한 후 Response 를 데이터베이스에 저장하고PredictResponse를 반환한다.GET /predict로 접근하면 데이터베이스에 저장된 모든PredictResponse를 반환한다.GET /predict/{id}로 접근하면 id 로 필터링하여 해당 id 에 맞는PredictResponse를 반환한다.- FastAPI 서버가 띄워질 때 Model 을 Load 한다.
- API 의 경우 구현하면서 각 계층에 맞는 역할을 하도록 매순간 고민하면서 개발한다.
- 필요 라이브러리의 버전 관리는 Poetry 를 이용한다. DB 는 SQLite3 과 ORM 을 위해 SQLAlchemy 를 이용한다.
프로젝트 구조
-
아래와 같이 프로젝트 구조를 잡을 수 있다. 이 포스트에서는 간단한 구현 예제를 다루기 때문에 Layered Architecture 구성, DTO, Error Handling, Middleware, Test 과정 등을 생략한다.
. ├── Dockerfile # 애플리케이션의 Docker Container Image 를 빌드하기 위한 설정 파일 ├── README.md # 프로젝트 개요, 설치 방법, 실행 방법 등을 설명한 문서 ├── app # 실제 애플리케이션 코드가 포함된 디렉토리 │ ├── core # 핵심 인프라 설정 및 전역적인 유틸리티 관리 계층 (Config, DB 등) │ │ ├── config.py # 환경 변수, 설정값 정의 (ex. DB URL) │ │ ├── dependencies.py # 의존성 주입 설정 │ │ ├── db # 데이터베이스 관련 설정 │ │ │ └── session.py # SQLAlchemy 세션 팩토리 및 연결 설정 │ │ ├── lifespan.py # FastAPI lifespan 이벤트 처리 (앱 시작/종료 시 실행할 로직) │ │ └── logger.py # 애플리케이션 로깅 설정 및 초기화 │ ├── main.py # FastAPI 애플리케이션의 엔트리포인트 (app 생성, 라우터 등록) │ └── models # 모델 계층: DB 모델, 스키마, DTO 등 데이터 정의 │ ├── db # ORM 모델 정의. DB 테이블과 매핑되는 클래스들 (SQLAlchemy 모델) │ │ └── database.py │ ├── ml_model # ML 모델 정의 │ │ └── model.py │ └── schemas # Pydantic 기반의 요청/응답 스키마 (API I/O 정의) │ └── schemas.py ├── entrypoint.sh # Docker Container 시작 시 실행되는 셸 스크립트 ├── poetry.lock # Poetry 패키지 버전 잠금 파일 └── pyproject.toml # 프로젝트의 종속성 및 빌드 설정 파일
ML Model
- 먼저 iris dataset 으로 모델을 학습하고, 예측하도록 하자.
-
일반적으로
model.pydataset.py,data_loader.py,train.py,test.py,inference.py등으로 세분화할 수 있지만, 여기서는 간단하게model.py안에 모든 기능을 모아서 구현해보자.from sklearn.datasets import load_iris from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import train_test_split def get_dataset(): iris = load_iris() X, y = iris.data, iris.target return X, y def get_model(): model = RandomForestClassifier(n_estimators=100, random_state=42) return model def train(model, X_train, y_train): model.fit(X_train, y_train) return model def predict(model, X_test): return model.predict(X_test) def evaluate(model, X_test, y_test): return model.score(X_test, y_test) def save_model(model, model_path: str): import joblib joblib.dump(model, model_path) def load_model(model_path: str): import joblib return joblib.load(model_path) def main(): X, y = get_dataset() X_train, X_test, y_train, y_test = train_test_split(X, y) model = get_model() model = train(model, X_train, y_train) score = evaluate(model, X_test, y_test) print(f"model score: {score}") save_model(model, "model.joblib") if __name__ == "__main__": main()
Config 및 Logger
-
프로젝트의 설정 파일을
config.py에 저장한다. 여기에는 DB 에 대한 설정 정보와 ML 모델의 경로 등을 작성한다.from pydantic import Field from pydantic_settings import BaseSettings class Config(BaseSettings): db_url: str = Field(default="sqlite:///./db.sqlite3", env="DB_URL") model_path: str = Field(default="model.joblib", env="MODEL_PATH") app_env: str = Field(default="local", env="APP_ENV") config = Config() -
이후 효율적인 디버깅을 위해
logger.py를 만들고 Logger 를 반환하도록 한다.import logging def init_logger(): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) formatter = logging.Formatter( format="%(asctime)s:%(levelname)s:%(message)s", datefmt= "%m/%d %I:%M:%S %p" ) logger.setFormatter(formatter) return logger logger = init_logger()
DB 통신 및 Schema
- SQLite3 은 프로덕션 용도가 아닌 가벼운 상황에서 쉽게 사용할 수 있는 데이터베이스다. SQLAlchemy 는 Python ORM(Object Relational Mapping)으로, Database 를 객체로 다룰 수 있다.
- 이는 이전 포스트에서 정리했다.
- 아래와 같이
database.py를 구현할 수 있다.
import datetime from sqlalchemy import Column, Integer, DateTime, create_engine from sqlalchemy.ext.declarative import declarative_base from core.config import config Base = declarative_base() class PredictionResult(Base): __tablename__ = "predictionresult" id = Column(Integer, primary_key=True, autoincrement=True) result = Column(Integer, nullable=False) created_at = Column(DateTime, default=datetime.datetime.now) - 이제 Database 객체에서 테이블에 데이터를 저장하고 불러올 수 있다. Session 은 데이터베이스의 연결을 관리하는 방식이다. 예를 들어, 음식점에 가서 나올 때까지를 하나의 Session 으로 표현할 수 있다.
- 하나의 Session 안에서 가게 입장, 주문, 식사를 하는 것처럼, Session 내에서 Method(
POST / GET / PATCH)에 따라 데이터를 추가, 조회, 수정할 수 있다. -
Transaction 은 Session 내에 일어나는 모든 활동이다. Transaction 이 완료되면 결과가 데이터베이스에 저장된다. 아래와 같이
session.py를 구현할 수 있다.from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from core.config import config # 엔진 생성 engine = create_engine(config.db_url, echo=True) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) -
이제
schema.py에서 Request 와 Response 에 대한 Data Schema 를 구성해주자.from typing import List from pydantic import BaseModel class PredictionRequest(BaseModel): features: List[float] class PredictionResponse(BaseModel): id: int result: int -
마지막으로 FastAPI 에서 재사용 가능한 컴포넌트를 효율적으로 관리하고 코드의 유지보수를 용이하게 해주는 의존성 주입을
dependencies.py에 구현해보자.from core.db.session import SessionLocal from sqlalchemy.orm import Session def get_db(): db: Session = SessionLocal() try: yield db finally: db.close() model = None def load_model(model_path: str): import joblib global model model = joblib.load(model_path) def get_model(): global model return model
API
-
이제 위 기능들을 합하여 Router 함수를 구현할 수 있다.
from typing import List from fastapi import APIRouter, HTTPException, status, Depends from sqlalchemy.orm import Session from sqlalchemy import select from models.schemas.schemas import PredictionRequest, PredictionResponse from core.dependencies import get_db, get_model from models.db.database import PredictionResult router = APIRouter() @router.post("/predict", response_model=PredictionResponse) def predict( request: PredictionRequest, db: Session = Depends(get_db), model=Depends(get_model) ) -> PredictionResponse: # 예측 수행 prediction = int(model.predict([request.features])[0]) # DB 저장 prediction_result = PredictionResult(result=prediction) db.add(prediction_result) db.commit() db.refresh(prediction_result) return PredictionResponse(id=prediction_result.id, result=prediction) @router.get("/predict", response_model=List[PredictionResponse]) def get_predictions(db: Session = Depends(get_db)) -> list[PredictionResponse]: prediction_results = db.scalars(select(PredictionResult)).all() return [PredictionResponse(id=pr.id, result=pr.result) for pr in prediction_results] @router.get("/predict/{id}", response_model=PredictionResponse) def get_prediction(id: int, db: Session = Depends(get_db)) -> PredictionResponse: prediction_result = db.get(PredictionResult, id) # prediction_result = db.query(PredictionResult).filter(PredictionResult.id == id).first() 와 동일 if not prediction_result: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Prediction not found" ) return PredictionResponse(id=prediction_result.id, result=prediction_result.result)
Lifespan
-
이제 앱이 가동되기 전 실행해야 할 것들을 실행하도록 하는
lifespan.py를 구현한다.from contextlib import asynccontextmanager from fastapi import FastAPI from core.logger import logger from models.db.database import Base from core.db.session import engine from core.config import config from core.dependencies import load_model @asynccontextmanager async def lifespan(app: FastAPI): # SQLAlchemy 테이블 생성 logger.info("Creating database tables with SQLAlchemy") Base.metadata.create_all(bind=engine) # 모델 로드 logger.info("Loading model") load_model(config.model_path) yield
App 실행
-
마지막으로 아래와 같이 App 을 실행할 수 있다.
from fastapi import FastAPI from core.lifespan import lifespan from api import router import uvicorn app = FastAPI(lifespan=lifespan) # 라우터 등록 app.include_router(router) @app.get("/") def root(): return "Hello World!" if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
Docker
-
아래와 같이 Docker Image 를 띄우기 위해
Dockerfile을 작성한다. 주의할 점은poetry export -f requirements.txt --without-hashes > requirements.txt를 통해 requirements 를 만들어줘야 한다.FROM python:3.9.13-slim WORKDIR /code COPY ./requirements.txt /code/requirements.txt RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt COPY . /code CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
더 개선해볼 점
- 이처럼 FastAPI 는 간결하고 빠르게 웹 서비스를 시작할 수 있는 유용한 프레임워크다.
- 위와 같은 기본적인 싱클 패턴에서 더 나아가, 아래와 같이 개선하면서 더 복잡하고 유의미한 가치를 전달할 수 있는 서비스를 만들 수 있다.
Web Server
- FastAPI 서버 앞단에 Node.js 나 Nginx 등을 배치해서 웹 서버를 만들 수 있다.
- 이를 이용하면 정적 파일 서빙, SSL 설정, CORS 처리, 로드 밸런싱, 리버스 프록시 등의 기능을 사용할 수 있다.
- HTML, JS, CSS 등의 프론트엔드 정적 파일을 CDN 으로 서빙함으로써 백엔드 서버의 부하를 줄일 수 있다.
WAS
- 위 구현은 라우터(router)가 없다.
routers/디렉토리가 빠져 있어, 라우팅 처리가main.py에 몰릴 수 있다. API 가 늘어난다면app/routers/디렉토리를 만들어 기능별 라우터 분리를 할 수 있다. - 구현된 router 와 prediction, DB 의 session 에서 비동기 방식을 활용한다. 즉
async def와AsyncSession을 적극 활용하여 FastAPI 의 장점인 비동기 I/O 성능을 극대화할 수 있다.- 일반적으로 예측 모델 호출, DB 조회 등 외부 자원에 대한 접근에는 비동기가 필수적이다.
- 간단한 앱에는 적합한 구조지만, 비즈니스 로직이 많아지면
services/계층을 만들어서 router $\rightarrow$ service $\rightarrow$ repository 흐름을 따르는 layered architecture 로 구성하는 것이 좋다.- Router 에 비즈니스 로직이 섞이게 되면 테스트와 확장성이 떨어진다. 따라서 서비스 계층 도입을 권장한다.
models/schemas/schemas.py에 모든 Pydantic Schema 를 몰아두게 되면, Schema 수가 많아질 때 관리가 힘들어진다.user.py,item.py등 기능별로 분할하는 방식을 고려해볼 수 있다.- DTO 를 도입하면 코드가 깔끔하고 명확해진다. 시스템 간 또는 계층 간 데이터를 전달하기 위한 객체를 두면, 실무에서 API 를 설계할 때 유지보수성과 확장성을 향상시킬 수 있다.
- Request 와 Response 측면에서 중첩되는 Pydantic 은 성능이 좋지 않을 수 있다. Response Model 에만 Pydantic Class 로 정의해주고 실제 반환에서는 ORJSONResponse Class 를 사용해서 직렬화를 통해 반환하는 것이 성능 상 유리하다.
DB Server
- Redis 와 같은 캐시 서버를 활용할 수 있다. 자주 조회되는 예측 결과나 유저 데이터를 Redis 에 캐싱하면 응답 속도를 크게 개선할 수 있다.
- 또한 DB 에 대한 연결을 매번 새로 만드는 대신, Connection Pool 을 이용한다. SQLAlchemy 의 async 기능을 이용하여 엔진을 만들고, 여기에 Connection Pool 관련 인자들을 건네줄 수 있다.
- DB Session 을 만들 때 비동기 task 단위로 Session Scope 를 제한하여, 요청마다 Session 이 동일하게 유지할 수 있도록 한다.
ML Model
- 예측 모델이 자주 바뀌거나 AB Test 가 필요한 경우, MLFlow 를 활용하면 모델을 체계적으로 관리할 수 있다. 또한 롤백이나 추적이 쉬워진다.
- 추가적으로 모델 추론에 Latency 가 있거나 모델 크기가 커지는 경우, FastAPI 내에서 직접 모델을 불러오는 대신에
TorchServe,TensorFlow Serving또는NVIDIA Triton등의 서빙 솔류션을 사용하여 모델 추론 서버를 분리할 수 있다. - 또한 추론 서버 앞에 로드 밸런싱 혹은 메시지 시스템을 두어 서버의 부하를 줄이고 결과 반환 속도를 높여 사용자 경험을 크게 향상시킬 수 있다.
Test 및 Monitoring
pytest와httpx.AsyncClient를 활용하여 API 의 정상 동작 여부를 테스트할 수 있다.- Prometheus 와 Grafana 를 활용하여 API 의 Latency, 에러율, 트래픽 등을 시각화하고 알람을 설정하여 장애 대응 속도를 높일 수 있다.
- 구조화된 로그를
ELK또는Loki등에 연동하여 실시간 로그 조회 및 분석을 해볼 수 있다.
CI/CD/CT
- Github Actions 를 통해 자동화 파이프라인을 구성한다. 테스트, 빌드, Docker Image 배포를 자동화하여 빠른 배포 사이클을 구축할 수 있다.
- 또한 Airflow 를 활용하여 모델 재학습을 스케줄링한다. 일정 주기로 모델을 재학습하고 자동 배포까지 연결하려면 Airflow DAG 를 구성하는 것이 좋다.
- Airflow 를 사용하면 데이터를 배치 수집하거나, 사용자로부터 수집한 데이터를 처리할 수 있다.
- 추가적으로 지속적인 학습과 예측을 위해 Feast 와 같은 Feature Store 를 도입하여 예측에 사용된 feature 와 학습 feature 간의 일관성을 확보할 수 있다.
댓글 남기기