[FastAPI] FastAPI 로 Web Single Pattern 구현 예제


지금까지 FastAPI 를 사용하기 위한 개념들을 정리했다. 이제 FastAPI 로 Web Single Pattern 을 구현해보자.

FastAPI 구현

  • Iris Dataset 을 대상으로 분류 모델을 만들고, 예측 결과를 반환하는 웹 애플리케이션을 만들어보자.
  • Config $\rightarrow$ Database $\rightarrow$ Application(Service) $\rightarrow$ API 의 순서처럼 사용자 입장의 역순으로 개발하는 것이 좋다.
  • 아래와 같이 FastAPI 를 활용한 ML 서버를 간단하게 구현해보자.
    • POST /predict 로 접근하면 예측을 진행한 후 Response 를 데이터베이스에 저장하고 PredictResponse 를 반환한다.
    • GET /predict 로 접근하면 데이터베이스에 저장된 모든 PredictResponse 를 반환한다.
    • GET /predict/{id} 로 접근하면 id 로 필터링하여 해당 id 에 맞는 PredictResponse 를 반환한다.
    • FastAPI 서버가 띄워질 때 Model 을 Load 한다.
  • API 의 경우 구현하면서 각 계층에 맞는 역할을 하도록 매순간 고민하면서 개발한다.
  • 필요 라이브러리의 버전 관리는 Poetry 를 이용한다. DB 는 SQLite3 과 ORM 을 위해 SQLAlchemy 를 이용한다.

프로젝트 구조

  • 아래와 같이 프로젝트 구조를 잡을 수 있다. 이 포스트에서는 간단한 구현 예제를 다루기 때문에 Layered Architecture 구성, DTO, Error Handling, Middleware, Test 과정 등을 생략한다.

    .
    ├── Dockerfile                 # 애플리케이션의 Docker Container Image 를 빌드하기 위한 설정 파일
    ├── README.md                  # 프로젝트 개요, 설치 방법, 실행 방법 등을 설명한 문서
    ├── app                        # 실제 애플리케이션 코드가 포함된 디렉토리
    │   ├── core                       # 핵심 인프라 설정 및 전역적인 유틸리티 관리 계층 (Config, DB 등)
    │   │   ├── config.py                  # 환경 변수, 설정값 정의 (ex. DB URL)
    │   │   ├── dependencies.py            # 의존성 주입 설정
    │   │   ├── db                         # 데이터베이스 관련 설정
    │   │   │   └── session.py                 # SQLAlchemy 세션 팩토리 및 연결 설정
    │   │   ├── lifespan.py                # FastAPI lifespan 이벤트 처리 (앱 시작/종료 시 실행할 로직)
    │   │   └── logger.py                  # 애플리케이션 로깅 설정 및 초기화
    │   ├── main.py                    # FastAPI 애플리케이션의 엔트리포인트 (app 생성, 라우터 등록)
    │   └── models                     # 모델 계층: DB 모델, 스키마, DTO 등 데이터 정의
    │       ├── db                         # ORM 모델 정의. DB 테이블과 매핑되는 클래스들 (SQLAlchemy 모델)
    │       │   └── database.py
    │       ├── ml_model                   # ML 모델 정의
    │       │   └── model.py
    │       └── schemas                    # Pydantic 기반의 요청/응답 스키마 (API I/O 정의)
    │           └── schemas.py
    ├── entrypoint.sh              # Docker Container 시작 시 실행되는 셸 스크립트
    ├── poetry.lock                # Poetry 패키지 버전 잠금 파일
    └── pyproject.toml             # 프로젝트의 종속성 및 빌드 설정 파일
    

ML Model

  • 먼저 iris dataset 으로 모델을 학습하고, 예측하도록 하자.
  • 일반적으로 model.py dataset.py, data_loader.py, train.py, test.py, inference.py 등으로 세분화할 수 있지만, 여기서는 간단하게 model.py 안에 모든 기능을 모아서 구현해보자.

    from sklearn.datasets import load_iris
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import train_test_split
    
    def get_dataset():
        iris = load_iris()
        X, y = iris.data, iris.target
        return X, y
    
    def get_model():
        model = RandomForestClassifier(n_estimators=100, random_state=42)
        return model
    
    def train(model, X_train, y_train):
        model.fit(X_train, y_train)
        return model
        
    def predict(model, X_test):
        return model.predict(X_test)
    
    def evaluate(model, X_test, y_test):
        return model.score(X_test, y_test)
    
    def save_model(model, model_path: str):
        import joblib
    
        joblib.dump(model, model_path)
    
    def load_model(model_path: str):
        import joblib
    
        return joblib.load(model_path)
    
    def main():
        X, y = get_dataset()
        X_train, X_test, y_train, y_test = train_test_split(X, y)
        model = get_model()
        model = train(model, X_train, y_train)
        score = evaluate(model, X_test, y_test)
        print(f"model score: {score}")
        save_model(model, "model.joblib")
    
    if __name__ == "__main__":
        main()
    

Config 및 Logger

  • 프로젝트의 설정 파일을 config.py 에 저장한다. 여기에는 DB 에 대한 설정 정보와 ML 모델의 경로 등을 작성한다.

    from pydantic import Field
    from pydantic_settings import BaseSettings
    
    class Config(BaseSettings):
        db_url: str = Field(default="sqlite:///./db.sqlite3", env="DB_URL")
        model_path: str = Field(default="model.joblib", env="MODEL_PATH")
        app_env: str = Field(default="local", env="APP_ENV")
    
    config = Config()
    
  • 이후 효율적인 디버깅을 위해 logger.py 를 만들고 Logger 를 반환하도록 한다.

    import logging
    
    def init_logger():
        logger = logging.getLogger(__name__)
        logger.setLevel(logging.DEBUG)
        formatter = logging.Formatter(
          format="%(asctime)s:%(levelname)s:%(message)s",
          datefmt= "%m/%d %I:%M:%S %p"
        )
        logger.setFormatter(formatter)
    
        return logger
      
    logger = init_logger()
    

DB 통신 및 Schema

  • SQLite3 은 프로덕션 용도가 아닌 가벼운 상황에서 쉽게 사용할 수 있는 데이터베이스다. SQLAlchemy 는 Python ORM(Object Relational Mapping)으로, Database 를 객체로 다룰 수 있다.
    • 이는 이전 포스트에서 정리했다.
    • 아래와 같이 database.py 를 구현할 수 있다.
    import datetime
    from sqlalchemy import Column, Integer, DateTime, create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from core.config import config
    
    Base = declarative_base()
    
    class PredictionResult(Base):
        __tablename__ = "predictionresult"
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        result = Column(Integer, nullable=False)
        created_at = Column(DateTime, default=datetime.datetime.now)
    
  • 이제 Database 객체에서 테이블에 데이터를 저장하고 불러올 수 있다. Session 은 데이터베이스의 연결을 관리하는 방식이다. 예를 들어, 음식점에 가서 나올 때까지를 하나의 Session 으로 표현할 수 있다.
  • 하나의 Session 안에서 가게 입장, 주문, 식사를 하는 것처럼, Session 내에서 Method(POST / GET / PATCH)에 따라 데이터를 추가, 조회, 수정할 수 있다.
  • Transaction 은 Session 내에 일어나는 모든 활동이다. Transaction 이 완료되면 결과가 데이터베이스에 저장된다. 아래와 같이 session.py 를 구현할 수 있다.

    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    from core.config import config
      
    # 엔진 생성
    engine = create_engine(config.db_url, echo=True)
    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    
  • 이제 schema.py 에서 Request 와 Response 에 대한 Data Schema 를 구성해주자.

    from typing import List
    from pydantic import BaseModel
    
    class PredictionRequest(BaseModel):
        features: List[float]
    
    class PredictionResponse(BaseModel):
        id: int
        result: int
    
  • 마지막으로 FastAPI 에서 재사용 가능한 컴포넌트를 효율적으로 관리하고 코드의 유지보수를 용이하게 해주는 의존성 주입을 dependencies.py 에 구현해보자.

    from core.db.session import SessionLocal
    from sqlalchemy.orm import Session
    
    def get_db():
        db: Session = SessionLocal()
        try:
            yield db
        finally:
            db.close()
      
    model = None
    
    def load_model(model_path: str):
        import joblib
        
        global model
        model = joblib.load(model_path)
    
    def get_model():
        global model
        return model
    

API

  • 이제 위 기능들을 합하여 Router 함수를 구현할 수 있다.

    from typing import List
    from fastapi import APIRouter, HTTPException, status, Depends
    from sqlalchemy.orm import Session
    from sqlalchemy import select
    
    from models.schemas.schemas import PredictionRequest, PredictionResponse
    from core.dependencies import get_db, get_model
    from models.db.database import PredictionResult
    
    router = APIRouter()
    
    @router.post("/predict", response_model=PredictionResponse)
    def predict(
        request: PredictionRequest,
        db: Session = Depends(get_db),
        model=Depends(get_model)
    ) -> PredictionResponse:
        # 예측 수행
        prediction = int(model.predict([request.features])[0])
    
        # DB 저장
        prediction_result = PredictionResult(result=prediction)
        db.add(prediction_result)
        db.commit()
        db.refresh(prediction_result)
    
        return PredictionResponse(id=prediction_result.id, result=prediction)
    
    @router.get("/predict", response_model=List[PredictionResponse])
    def get_predictions(db: Session = Depends(get_db)) -> list[PredictionResponse]:
        prediction_results = db.scalars(select(PredictionResult)).all()
    
        return [PredictionResponse(id=pr.id, result=pr.result) for pr in prediction_results]
    
    @router.get("/predict/{id}", response_model=PredictionResponse)
    def get_prediction(id: int, db: Session = Depends(get_db)) -> PredictionResponse:
        prediction_result = db.get(PredictionResult, id)
        # prediction_result = db.query(PredictionResult).filter(PredictionResult.id == id).first() 와 동일
    
        if not prediction_result:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail="Prediction not found"
            )
    
        return PredictionResponse(id=prediction_result.id, result=prediction_result.result)
    

Lifespan

  • 이제 앱이 가동되기 전 실행해야 할 것들을 실행하도록 하는 lifespan.py 를 구현한다.

    from contextlib import asynccontextmanager
    from fastapi import FastAPI
    from core.logger import logger
    from models.db.database import Base
    from core.db.session import engine
    from core.config import config
    from core.dependencies import load_model
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):
          
        # SQLAlchemy 테이블 생성
        logger.info("Creating database tables with SQLAlchemy")
        Base.metadata.create_all(bind=engine)
    
        # 모델 로드
        logger.info("Loading model")
        load_model(config.model_path)
    
        yield
    

App 실행

  • 마지막으로 아래와 같이 App 을 실행할 수 있다.

    from fastapi import FastAPI
    from core.lifespan import lifespan
    from api import router
    
    import uvicorn
    
    app = FastAPI(lifespan=lifespan)
    
    # 라우터 등록
    app.include_router(router)
    
    @app.get("/")
    def root():
        return "Hello World!"
    
    if __name__ == "__main__":
        uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
    

Docker

  • 아래와 같이 Docker Image 를 띄우기 위해 Dockerfile 을 작성한다. 주의할 점은 poetry export -f requirements.txt --without-hashes > requirements.txt 를 통해 requirements 를 만들어줘야 한다.

    FROM python:3.9.13-slim
    
    WORKDIR /code
    COPY ./requirements.txt /code/requirements.txt
    RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
    COPY . /code
    CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
    

더 개선해볼 점

  • 이처럼 FastAPI 는 간결하고 빠르게 웹 서비스를 시작할 수 있는 유용한 프레임워크다.
  • 위와 같은 기본적인 싱클 패턴에서 더 나아가, 아래와 같이 개선하면서 더 복잡하고 유의미한 가치를 전달할 수 있는 서비스를 만들 수 있다.

Web Server

  • FastAPI 서버 앞단에 Node.js 나 Nginx 등을 배치해서 웹 서버를 만들 수 있다.
  • 이를 이용하면 정적 파일 서빙, SSL 설정, CORS 처리, 로드 밸런싱, 리버스 프록시 등의 기능을 사용할 수 있다.
  • HTML, JS, CSS 등의 프론트엔드 정적 파일을 CDN 으로 서빙함으로써 백엔드 서버의 부하를 줄일 수 있다.

WAS

  • 위 구현은 라우터(router)가 없다. routers/ 디렉토리가 빠져 있어, 라우팅 처리가 main.py 에 몰릴 수 있다. API 가 늘어난다면 app/routers/ 디렉토리를 만들어 기능별 라우터 분리를 할 수 있다.
  • 구현된 router 와 prediction, DB 의 session 에서 비동기 방식을 활용한다. 즉 async defAsyncSession 을 적극 활용하여 FastAPI 의 장점인 비동기 I/O 성능을 극대화할 수 있다.
    • 일반적으로 예측 모델 호출, DB 조회 등 외부 자원에 대한 접근에는 비동기가 필수적이다.
  • 간단한 앱에는 적합한 구조지만, 비즈니스 로직이 많아지면 services/ 계층을 만들어서 router $\rightarrow$ service $\rightarrow$ repository 흐름을 따르는 layered architecture 로 구성하는 것이 좋다.
    • Router 에 비즈니스 로직이 섞이게 되면 테스트와 확장성이 떨어진다. 따라서 서비스 계층 도입을 권장한다.
  • models/schemas/schemas.py 에 모든 Pydantic Schema 를 몰아두게 되면, Schema 수가 많아질 때 관리가 힘들어진다. user.py, item.py 등 기능별로 분할하는 방식을 고려해볼 수 있다.
  • DTO 를 도입하면 코드가 깔끔하고 명확해진다. 시스템 간 또는 계층 간 데이터를 전달하기 위한 객체를 두면, 실무에서 API 를 설계할 때 유지보수성과 확장성을 향상시킬 수 있다.
  • Request 와 Response 측면에서 중첩되는 Pydantic 은 성능이 좋지 않을 수 있다. Response Model 에만 Pydantic Class 로 정의해주고 실제 반환에서는 ORJSONResponse Class 를 사용해서 직렬화를 통해 반환하는 것이 성능 상 유리하다.

DB Server

  • Redis 와 같은 캐시 서버를 활용할 수 있다. 자주 조회되는 예측 결과나 유저 데이터를 Redis 에 캐싱하면 응답 속도를 크게 개선할 수 있다.
  • 또한 DB 에 대한 연결을 매번 새로 만드는 대신, Connection Pool 을 이용한다. SQLAlchemy 의 async 기능을 이용하여 엔진을 만들고, 여기에 Connection Pool 관련 인자들을 건네줄 수 있다.
  • DB Session 을 만들 때 비동기 task 단위로 Session Scope 를 제한하여, 요청마다 Session 이 동일하게 유지할 수 있도록 한다.

ML Model

  • 예측 모델이 자주 바뀌거나 AB Test 가 필요한 경우, MLFlow 를 활용하면 모델을 체계적으로 관리할 수 있다. 또한 롤백이나 추적이 쉬워진다.
  • 추가적으로 모델 추론에 Latency 가 있거나 모델 크기가 커지는 경우, FastAPI 내에서 직접 모델을 불러오는 대신에 TorchServe, TensorFlow Serving 또는 NVIDIA Triton 등의 서빙 솔류션을 사용하여 모델 추론 서버를 분리할 수 있다.
  • 또한 추론 서버 앞에 로드 밸런싱 혹은 메시지 시스템을 두어 서버의 부하를 줄이고 결과 반환 속도를 높여 사용자 경험을 크게 향상시킬 수 있다.

Test 및 Monitoring

  • pytesthttpx.AsyncClient 를 활용하여 API 의 정상 동작 여부를 테스트할 수 있다.
  • Prometheus 와 Grafana 를 활용하여 API 의 Latency, 에러율, 트래픽 등을 시각화하고 알람을 설정하여 장애 대응 속도를 높일 수 있다.
  • 구조화된 로그를 ELK 또는 Loki 등에 연동하여 실시간 로그 조회 및 분석을 해볼 수 있다.

CI/CD/CT

  • Github Actions 를 통해 자동화 파이프라인을 구성한다. 테스트, 빌드, Docker Image 배포를 자동화하여 빠른 배포 사이클을 구축할 수 있다.
  • 또한 Airflow 를 활용하여 모델 재학습을 스케줄링한다. 일정 주기로 모델을 재학습하고 자동 배포까지 연결하려면 Airflow DAG 를 구성하는 것이 좋다.
    • Airflow 를 사용하면 데이터를 배치 수집하거나, 사용자로부터 수집한 데이터를 처리할 수 있다.
  • 추가적으로 지속적인 학습과 예측을 위해 Feast 와 같은 Feature Store 를 도입하여 예측에 사용된 feature 와 학습 feature 간의 일관성을 확보할 수 있다.
맨 위로 이동 ↑

댓글 남기기