[FastAPI, Database] DB 통신 및 캐시 구현


웹 애플리케이션에서 DB 는 필수적 요소다. 이와 관련하여 FastAPI 와 함께 SQLAlchemy 와 Redis 를 많이 사용한다. SQLAlchemy 는 애플리케이션 개발자에게 SQL 의 모든 기능과 유연성을 제공하는 Python SQL Toolkit 이다.

또한 일반적으로 DBMS 는 데이터를 디스크에 저장한다. 디스크는 용량당 가격은 낮지만, 접근 속도가 매우 느리다. SSD 와 같은 플래시 드라이브의 발전으로 이전보다 속도가 빨라졌다고는 하지만 여전히 CPU Cache, DRAM 과 같은 저장장치에 비하면 한참 못 미치는 속도를 가지고 있다. 따라서 API 요청 시 매번 DB 에서 값을 읽어온다면, Disk I/O 가 병목이 될 수 있다. 이를 해결하기 위해 In Memory 스토리지를 캐시 레이어(캐시 서버)로 두고, 한번 DB 에서 조회한 데이터를 일정 시간 동안 캐시에 저장한 뒤 캐시가 만료되지 않았을 경우 꺼내 사용할 수 있도록 한다.

이와 같은 DB 통신과 캐시에 대해 알아보자.

SQLAlchemy

  • SQLAlchemy 는 DB 연결 및 풀링 관리, SQL 표현식 생성, 테이블 스키마 및 타입을 정의할 때 사용한다. 이렇게 SQLAlchemy 를 통해 정의한 DB 연결 세션과 테이블은 객체처럼 사용할 수 있는데, 이는 ORM(Object Relational Mapper)과 관련이 깊다.
  • 또한 모든 환경에 대해 지원하는 것은 아니지만 AsyncIO 를 기반으로 한 비동기 DB 통신을 지원한다.

DB 엔진

  • SQLAlchemy 를 이용하여 DB 와 통신하기 위해서는 아래와 같이 Connection Engine 객체를 정의해야 한다.

    from sqlalchemy import create_engine
    
    engine = create_engine(url="sqlite+pysqlite:///:memory:", echo=True)
    
  • 이 때 create_engine 에는 대표적으로 아래와 같은 argument 가 있다.

    • url
      • DB 주소(DSN, Data Source Name)를 의미한다.
      • <driver>://<username>:<password>@<host>:<port>/<database> 와 같은 형식을 가진다.
    • isolation_level
      • 트랜잭션 고립 레벨을 의미한다.
      • "SERIALIZABLE""REPEATABLE READ""READ COMMITTED""READ UNCOMMITTED" , "AUTOCOMMIT" 이 있고, 사용하는 DB 에 따라 기본 값이 다르다. 일반적으로 "READ COMMITTED" 가 기본값이다.
    • echo
      • SQL 문을 log 로 남기는 방법이다.

Connection

  • engine 을 정의하고 실제로 DB 와 통신하기 위해서는 연결을 해야한다. 이는 DBMS 측에 커넥션을 위한 프로세스를 생성하고, 클라이언트와 통신을 할 준비를 한다는 이야기.
  • engine.connect() 라는 메소드를 호출하여 연결할 수 있다. 일반적으로 with 문으로 커넥션을 열고, 구문이 닫힐 때 커넥션을 닫는다.

    from sqlalchemy import text
    
    with engine.connect() as conn:
        try:
            result = conn.execute(text("select 'hello world'"))
            print(result.all())
            # conn.commit()
        except Exception as e:
            # conn.rollback()
    
  • 커넥션은 개념상 DB 의 트랜잭션과 동일하며, 하나의 커넥션 내에서 CUD(write) 작업을 한 경우 commit 을 호출하지 않으면 DB 에 반영되지 않는다. 단순히 읽는 작업의 경우에는 commit 을 호출할 필요가 없다.
  • write 연산 시 에러가 발생하는 경우, try except 문으로 rollback 을 호출해 트랜잭션 내 변경이 DB 에 반영되지 않도록 한다.

Connection Pool

  • 매번 통신을 할 때 새로운 커넥션을 생성하는 것은 DBMS 에 무리를 줄 수 있다. 대부분 DBMS 의 파라미터로 커넥션의 최대 개수가 정해져있으며, 이를 넘어가게 되면 새로운 연결을 생성할 수 없게 된다. 따라서 커넥션 개수 관리를 해줄 필요가 있다.
  • engine 을 정의할 때 몇 가지 옵션을 추가로 전달하여 어플리케이션 레벨 커넥션 풀을 정의할 수 있다.
  • 커넥션 풀을 이용하면, 이전에 생성한 커넥션을 통신 이후에 바로 정리하는 것이 아니라 풀에 저장하고 있다가 커넥션 생성 요청이 들어왔을 때 재사용할 수 있다.

    from sqlalchemy import create_engine
    
    engine = create_engine(url="sqlite+pysqlite:///:memory:",
                           pool_size=10,       # 평상시에는 최대 10개까지 커넥션 유지
                           max_overflow=5,     # 순간적으로 10개 이상의 커넥션이 요청되는 경우 최대 15개까지 생성
                           pool_pre_ping=True,
                           echo=True)
    
  • pool_size 은 커넥션 풀 크기를 의미하며, max_overflow 은 커넥션 풀이 순간적으로 늘어날 수 있기 때문에 pool_size 를 초과하고도 추가로 더 보유할 수 있는 커넥션 개수를 의미한다.
  • pool_pre_ping argument 는 커넥션 풀에서 커넥션을 꺼내기 전에 연결이 끊어지지 않았는지를 확인하는 옵션이다.
  • 이외에 pg_pool 과 같이 DBMS 레벨에서 커넥션 풀을 관리하는 기술들도 있다.

ORM

  • ORM(Object-Relational Mapping)은 객체 지향 프로그래밍 언어에서 객체와 관계형 DB 를 연결해주는 기술이다.
  • ORM 의 장점은 아래와 같다.
    • 객체 지향적인 코드 작성: ORM 은 DB 테이블을 객체로 매핑하여 프로그래밍 언어로 작업할 수 있게 한다. 이것은 유지보수성과 확장성을 향상시킨다.
    • 코드 중복 최소화: ORM 을 사용하면 SQL 쿼리를 직접 작성하는 대신 ORM 의 메소드와 DB API 를 사용하여 DB 와 통신할 수 있다. 이를 통해 코드 중복을 줄이고 코드를 더 간결하게 유지할 수 있다.
    • DBMS 독립성: ORM 은 DBMS 에 대한 의존성을 줄여준다. 코드를 변경하지 않고도 다른 DBMS 로의 전환을 용이하게 한다. 이는 유연성과 이식성을 향상시킨다.
    • 보안 강화: ORM 은 보안 문제를 예방하기 위한 다양한 기능을 제공한다. 예를 들어, SQL 인젝션 공격으로부터 보호하기 위해 ORM 은 입력을 자동으로 이스케이핑하고 매개 변수화된 쿼리를 생성한다.
    • 복잡한 관계 처리: ORM 은 복잡한 DB 관계를 다루는 데 도움이 된다. 객체 간의 관계를 유지하면서 DB 에서 JOIN 및 관련 작업을 처리하는 데 ORM 을 사용할 수 있다.
  • 그러나 반대로 SQL Text 로 작성하면 쉬운 쿼리를 ORM 문법으로 작성했을 때 더 복잡해지기도 한다. 그러나 전반적으로 ORM 으로 표현하는 것이 쉬운 편이다.
  • ORM 을 이용해 DB 테이블을 정의하기 위해서는 먼저 ORM 을 정의할 수 있는 Base 클래스를 만들고 해당 클래스를 상속 받는 방식으로 구현한다.
  • 2.x 버전 Sqlalchemy 의 ORM 클래스는 Dataclass 에서 영감을 받은 선언적 정의 방식을 따른다. (참고 링크)

    from typing import Optional
    from sqlalchemy import String, Integer
    from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Column
    
    class Base(DeclarativeBase):
        pass
    
    # 1.x style
    class User(Base):
        __tablename__ = "user_account"
        
        id: int = Column(Integer, primary_key=True)
        name: str = Column(String(30), nullable=False)
        fullname: Optional[str] = Column(String, nullable=True)  
    
    # 2.x style
    class User(Base):
        __tablename__ = "user_account"
        
        id: Mapped[int] = mapped_column(primary_key=True)
        name: Mapped[str] = mapped_column(String(30))
        fullname: Mapped[Optional[str]]
    
  • Type Hint 를 통해 해당 컬럼의 자료형을 매핑하여 지정한다. Type Hint 로 Mapped[자료형] 이 주어졌다면, mapped_column 생성자로 값을 지정해주지 않아도 된다.
  • nullable 의 경우 typing.Optional 로 명시한다.

Session

  • SQLAlchemy 에서 ORM 을 사용할 때는 Session 객체를 사용한다.
    • Engine 은 SQLAlchemy 에서 가장 기본적인 객체로, DB 와 통신하기 위한 연결 풀을 관리한다.
    • Connection 은 실제로 SQL 쿼리를 실행하는 객체다. 연결의 속성이나 닫힘 시점 등을 더 세밀하게 제어할 때 사용한다.
    • Session 은 SQLAlchemy 의 ORM 기능을 사용할 때 사용하는 객체다. 내부적으로 connection 과 트랜잭션을 사용하여 자동 생성된 SQL 문을 실행한다.
  • ORM 을 사용한다면 Session 을 사용하는 것이 좋고, 순수 SQL 쿼리라면 Connection 을 직접 사용하는 것이 더 좋다.
  • 세션에 포함된 ORM 객체는 해당 세션이 종료(commit, rollback, close)될 때까지 변경점을 세션이 트래킹하게 된다.

    from sqlalchemy import create_engine, select, insert, update, delete
    from sqlalchemy.orm import Session
    
    engine = create_engine()
    
    with Session(bind=engine) as session:
        # select
        stmt = select(User).where(User.name =='bkh')
        result = session.execute(stmt).scalar() 
        # insert
        stmt = insert(User).values(name='bkh', fullname='bkkhyunn').returning(User)
        result = session.execute(stmt).scalar() # insert new row to User Table
        session.commit()
        # update
        stmt = update(User).where(User.name=='bkh').values(User.name='BKH').returning(User)
        result = session.execute(stmt).scalar() # update name from 'bkh' to 'BKH'
        session.commit()
        # delete
        stmt = delete(User).where(User.name=='bkh').returning(User)
        result = session.execute(stmt).scalar()
        session.commit()
    
  • .scalar(), .scalars(), .all() 는 단일 값을 반환하고, .fetchone(), .fetchall() 은 리스트를 반환한다.

실제 사용 시

  • ORM 관련 코드들은 db/session.py 와 같은 파일에 위치한다.
  • 먼저 ORM 클래스들의 베이스 클래스로 사용할 Base 클래스를 정의한다. 이 때 SQLAlchemy 의 sqlalchemy.orm.DeclarativeBase 을 상속 받는다.
  • 그리고 설계대로 사용할 DB 테이블들을 만들게 된다. 이 때 각 테이블은 Base 클래스를 상속 받는다.
  • 이후 DB 통신을 위한 SQLAlchemy 엔진, 세션 객체 등을 정의할 때는 일반적으로 아래와 같다.
    • Engine: FastAPI 에서는 비동기 DB Engine 을 사용해야 하므로 create_engine 이 아닌 create_async_engine 을 사용해 생성한다.
    • Async Session Maker: 매 요청마다 세션을 생성할 sessionmaker 를 정의한다. 마찬가지로 비동기 세션을 생성해야 하므로 async_sessionmaker 를 사용한다.
    • Async Scoped Session: Session Maker 로 세션을 생성해서 사용할 수 있지만, 요청마다 세션이 동일하게 유지할 수 있도록 scope 를 정의해주는 scoped_session 을 정의해줘야 한다. 비동기 task 단위로 세션 scope 를 제한해야 하므로 async_scoped_sessionscopefunc 에 대해 current_task 를 지정해준다.
      • Scoped Session 은 동일한 스레드에 대해 하나의 DB 커넥션으로 세션을 사용할 수 있도록 해주는 기능이다.
      • 하지만 비동기의 경우 하나의 비동기 스레드를 사용하게 되고, 그렇게 되면 서로 다른 요청이 하나의 커넥션을 공유하게 된다. 이렇게 되면 각 DB 요청에서 충돌이 발생할 수 있다. (참고 링크)
  • 마지막으로 서버 실행/종료 시 DB 통신 확인 및 커넥션을 모두 정리해주는 비동기 함수를 추가한다. 이를 FastAPI 의 lifespan 기능을 통해 서버 실행 시 DB 연결 확인 및 서버 종료 시 DB 연결을 모두 정리할 수 있도록 한다.

    from asyncio import current_task
    
    from sqlalchemy import text
    from sqlalchemy.ext.asyncio import (
        AsyncSession,
        async_scoped_session,
        async_sessionmaker,
        create_async_engine,
    )
    from sqlalchemy.orm import DeclarativeBase
    import config  # 이미 정의한 config. DB_URL 등이 있음.
    
    class Base(DeclarativeBase): ...
    
    engine = create_async_engine(
        config.DB_URL, pool_size=10, max_overflow=5, echo=is_local(), pool_pre_ping=True
    )
    
    async_session_factory = async_sessionmaker(
        engine, class_=AsyncSession, expire_on_commit=False
    )
    
    AsyncScopedSession = async_scoped_session(async_session_factory, scopefunc=current_task)
    
    async def ping_db():
        async with engine.begin() as conn:
            await conn.execute(text("SELECT 1"))
    
    async def close_db():
        await engine.dispose()
    
  • 이제 세션을 사용할 때는 해당 파일의 AsyncScopedSession 객체를 사용하면 된다. 비동기이므로 async with 구문을 통해 일반적인 Session 사용법과 같이 사용하고, 주의할 것은 await session.execute(stmt) 와 같이 await 으로 받아야 한다.
  • 그리고 Lifespan Function 은 아래와 같이 작성할 수 있다.

    from contextlib import asynccontextmanager
    from fastapi import FastAPI
    
    from db.session import ping_db, close_db
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):
        await ping_db()
    
        yield
    
        await close_db()
    

In Memory DB

  • In Memory DB 는 기존 DB (오라클, MySQL 등)가 디스크(disk)에 데이터를 관리하는 반면, 메모리에 데이터를 업로드하여 사용하는 DB 를 말한다. 대용량 데이터의 증가와 맞물려 실시간 트랜잭션의 증가로 고성능 처리 요구에 대응하기 위한 목적으로 만들어졌다.
  • DB 의 기본적인 기능과 실시간 시스템 기술의 결합으로 Real Time Constraint 와 DB Operation 기능을 함께 제공한다. 단, 메모리에 저장하는 만큼 주기억장치의 휘발성으로 인해 오류 복구가 주요 해결 과제다.
  • 기존 DB 는 디스크 기반 DB 라고도 한다. 일반적으로 자주 불러오는 데이터 일부만을 메모리에 저장하여 사용하는 구조다. 이 때 메모리는 성능상의 이유로 캐시 방식을 통해 DB read 부하를 감소시키는 목적으로 주로 사용된다. 또한 대부분의 데이터는 Disk 에 보관하고 Tape 과 같은 매체에 백업하게 되어 있다.
  • 그에 반에 In Memory DB 는 데이터 갱신 자체에 메모리를 사용한다. 그리고 백업 및 로그 생성은 Disk 를 이용하는 방식이다. 이런 이유로 시스템이 종료(shotdown)되었을 때 메모리에 데이터에 대한 이중화 같은 보완 장치를 마련한다.

시스템 측면

  • 디스크 기반 DB 는 데이터 접근 측면에서 대표적인 인덱스인 B-tree 를 사용한다. 그리고 데이터 저장 위치가 디스크다. 반면 In Memory DB 는 T-tree 를 사용하고, 데이터 저장 위치도 메모리에 저장한다.
  • 두 DB 의 차이는 아래와 같다.
  • 디스크 기반 DB
    • 디스크 중심의 저장장치
    • 데이터 접근(Index)은 디시크 I/O 를 최소화하기 위한 B-tree 사용
    • 디스크에 위치
    • DB 의 크기는 디스크 크기다.
  • In Memory DB
    • Memory 중심의 저장장치
    • 데이터 접근(Index)은 메모리에 대한 최적화된 접근인 T-tree 를 사용
    • 메모리에 위치. 대신 자동 또는 수동으로 디스크를 반영한다.
    • DB 의 크기는 가상 메모리 크기다.

복구 및 안정화 측면

  • 복구 측면에서의 특징을 살펴보자. 디스크 기반 DB 는 디스크에 보관하기 때문에 안정적이고 대용량을 보관할 수 있다. 또한 undo, redo, checkpoint 등의 기술을 통해 복구가 가능하다.
  • In Memory DB 는 실시간의 빠른 데이터 처리를 목표로 하지만 메모리가 다운(down)되면 데이터가 사라진다. 따라서 주기적인 디스크 백업 등의 전략이 필요하다.
  • 두 DB 의 복구 및 안정화 측면에서의 차이는 아래와 같다.
  • 디스크 기반 DB
    • 해당 DB 의 운영 목표는 안정적으로 운영하고 대용량을 대상으로 한다.
    • 복구에는 UNDO/REDO, Checkpoint, Shadow Paging 등의 기법을 사용한다.
  • In Memory DB
    • 해당 DB 의 운영 목표는 빠른 트랜젝션 처리와 빠른 응답이다.
    • 복구에는 보조 메모리와 디스크를 이용한다. 데이터가 메모리에 존재하기 때문에 이중화 형태로 제공한다.

Redis, Memcached

  • In Memory DB 의 종류는 유형에 따라 다양하다. 여기서는 많이 사용하는 In Memory DB 인 Redis 와 Memcached 에 대해 비교해보자.

Redis

  • Redis 는 오픈소스 MDBMS(Multi Database Management System)다. MDBMS 는 플랫폼 기반 프레임워크를 제공하는 솔루션으로, 다차원 DB 를 의미한다.
  • 향상된 Key-Value 캐시와 저장 기능을 제공한다. 이처럼 key value 구조로 데이터를 관리하기 때문에 NoSQL DB 로 분류되기도 한다.
  • 또한 strings, hashes, lists, sets, sorted sets 등과 다양한 데이터 구조를 지원한다. 이는 더 유연한 데이터 모델링을 가능하게 한다.
  • RDB 방식과 AOF 방식을 사용하여 디스크에 데이터를 지속적으로 저장할 수 있는 옵션을 제공한다. 이는 데이터 손실을 방지하고, 재시작 시에도 데이터를 유지할 수 있게 한다. 즉 메모리가 날라가도 데이터를 복구할 수 있다.
    • RDB 방식은 특정한 시점의 스냅샷으로 백업하는 방식이다. 비교적 작은 사이즈의 파일로 백업하며 로딩 속도가 빠르다. 그리고 단일 파일이기 때문에 원거리 데이터 센터 또는 AWS S3 등의 클라우드로 전송하기 용이하다. Redis 의 경우 RDB 의 스냅샷 저장 조건이 설정되어 있다.
    • AOF 방식은 모든 쓰기 명령에 대한 로그를 남기는 방식이다. 쓰기 명령에 대해 추가하며 기록되기 때문에 파일 사이즈가 커지고, 서버 시작 시 로딩 속도가 느리다. AOF 가 활성화되면 Redis 실행 시 RDB 파일을 읽어오지 않고 AOF 파일을 읽어온다.
    • 공식 문서의 설명에 따르면 AOF 방식은 명령에 대한 로그를 통해 복원하는 방식이므로 실수로 데이터를 모두 날리는 flushall 명령을 실행하더라도 AOF 파일에서 그 명령어만 삭제해주면 데이터를 복원할 수 있다.
  • 마스터-슬레이브 레플리케이션(복사)을 지원하여 데이터의 가용성을 높일 수 있다. 이는 고가용성을 요구하는 환경에서 유용하다.
  • 아래의 Memcached 보다 더 많은 메모리(2배 정도)를 사용할 수 있다. 이는 실제로 필요한 메모리 양보다 더 많은 메모리를 사용하게 되기 때문이다. 따라서 대규모 캐시로 사용할 때 메모리 비용이 더 크게 들 수 있다.
  • 또한 메모리 파편화가 발생하기 쉽고, 다양한 기능과 API 를 제공하기 때문에 설정과 운영이 Memcached 보다 더 복잡할 수 있다.
  • Redis 는 다양한 데이터 타입과 복구 기능으로 서비스의 특정 기능을 위한 목적에 적합하다.

Memcached

  • 오픈소스 MDBMS 로 분산된 메모리 캐싱 시스템에 일반적으로 사용된다. RAM 에서 캐싱 데이터를 통해 동작하는 웹 기반 다이나믹 DB 에서 속도 향상을 위해 주로 사용되어 왔다.
  • Memcached 는 간단한 Key-Value 저장소로, 설치와 설정이 매우 간단하다. 따라서 초기 설정이나 사용법을 익히기 더 쉽다.
  • Redis 보다 더 적은 메모리를 사용한다. 따라서 대규모 데이터를 다룰 때 메모리 비용이 더 낮을 수 있다. 이러한 장점으로, DB 통신 또는 API 통신을 줄이기 위해 데이터 캐싱 처리에 사용하면 유리하다.
  • 트래픽이 몰려도 Redis 에 비해 응답 속도가 안정적이다.
  • 내부적으로 slab 할당자(참고링크)를 사용하고 있어서, 메모리 할당이 빈번하지 않다.
  • 그러나 Memcached 는 Redis 보다 기능이 제한적이다. 데이터 타입과 API 가 Redis 에 비해 다양하지 않으며 데이터 구조가 단순하고, 지속성을 지원하지 않는다.
  • 또한 데이터의 고가용성을 위해 내장된 메커니즘이 없다. 따라서 스스로 레플리케이션을 관리해야 하거나, 서드파티 솔루션을 사용해야 한다.
  • Memcached 는 복구에 민감하지 않은 데이터이면서 통신 속도를 향상시키는 목적에 적합하다.

Cache Server

  • 캐시 서버를 두면 좋은 점은 아래와 같다.
    • 높은 성능: In Memory DB 는 데이터를 메모리에 저장하기 때문에 디스크 기반의 DB 보다 훨씬 빠른 응답 시간을 제공한다.
    • 응답 시간 감소: 캐시로 데이터를 미리 로드해두면 데이터를 디스크에서 읽어오는 시간이나 네트워크로부터 데이터를 가져오는 데 필요한 시간이 크게 감소한다. 따라서 응답 시간이 단축되어 사용자 경험이 향상될 수 있다.
    • 확장성: In Memory 캐시는 일반적으로 확장성이 우수하다. 메모리 용량이 늘어날 때마다 더 많은 데이터를 저장하고 처리할 수 있으므로 시스템이 더 많은 트래픽을 처리할 수 있다.
    • 서버 부하 감소: In Memory 캐시를 사용하면 서버에 대한 부하를 줄일 수 있다. 캐시는 원본 DB 나 외부 서비스로부터 데이터를 가져와 캐싱하므로, 매번 데이터를 계산하거나 검색하는 작업이 줄어든다.
    • 데이터 접근 속도 향상: 메모리에 데이터를 저장하므로 데이터 접근이 빨라진다. 따라서 데이터에 대한 조회나 검색 작업이 빠르게 처리된다.
  • 이러한 캐싱 전략은 웹 서비스 환경에서 시스템 성능 향상을 기대할 수 있는 중요한 기술이지만, 데이터를 모두 캐시에 저장해버리면 메모리 용량 부족 현상이 일어나 시스템이 다운될 수 있다.
  • 따라서 어느 종류의 데이터를 캐시에 저장할지, 얼만큼의 데이터를 캐시에 저장할지, 얼마나 오래된 데이터를 캐시에서 제거하는지에 대한 지침 전략이 필요하다.
    • 이에 대해서는 해당 블로그 글에 잘 정리되어 있으니 참고하자.

Redis

  • Redis 를 활용한 캐시 서버를 띄우기 위해 Docker Container 를 실행한다.

    docker run --name redis -p 6379:6379 -d redis:7.2
    
    • -p 옵션은 Container 내부 포트와 외부 호스트 포트를 포워딩하는 포트 포워딩 옵션이며, -d 옵션은 백그라운드 프로세스처럼 Container 를 실행하는 Detach 모드 옵션이다.
  • Redis 와 통신하기 위해서는 Redis Client 객체를 정의해야 한다.
    • Redis 는 위에서 본 것처럼 숫자, 문자열, 리스트, 해시 등 다양한 데이터 타입을 지원한다. 또한 Python 객체를 pickle 로 직렬화 하여 문자열로 만든 뒤 저장하는 것도 가능하다.
    • Redis 의 작업 스레드는 단일 스레드이기 때문에, 동시에 많은 서버가 요청을 하더라도 Race Condition 과 같은 동기화 문제가 발생하지 않는다.
    from redis import Redis
    
    redis_client = Redis(host="0.0.0.0", port=6379)
    
    redis_client.set(key, value)           # key 로 value 를 write
    redis_client.get(key)                  # key 로 value 를 read
    redis_client.exists(key)               # key 가 존재하는지 확인 
    redis_client.hset(key, field1, field2) # key 로 dict 를 만들고 해당 dict 안에 field1:field2 로 write
    redis_client.hget(key, field1)         # key 로 dict 를 찾고 field1 에 대한 value 를 read
    redis_client.expire(key, 60)           # key 를 60초 후에 만료
    redis_client.incr(key)                 # key 에 저장된 값을 1 증가
    redis_client.decr(key)                 # key 에 저장된 값을 1 감소
    redis_client.incrby(key, 10)           # key 에 저장된 값을 10 증가
    redis_client.decrby(key, 10)           # key 에 저장된 값을 10 감소
    redis_client.ping()                    # redis 연결 확인
    
  • 이제 FastAPI 에서 Redis 를 사용하여 비동기 캐시 서버를 구현할 수 있다.
  • 먼저 Python 객체를 캐싱하기 위한 RedisCache 클래스를 정의한다. Python 기본 직렬화 패키지인 pickle 을 기반으로 Python 객체를 문자열로 직렬화 하고 직렬화한 문자열을 Redis 에 저장한다. 반대로 Redis 에서 읽은 데이터는 pickle 로 역직렬화 하여 원본 객체를 복구할 수 있다.
  • 마찬가지로 비동기로 동작하기 때문에 async 함수 정의와 await 을 사용해야 한다.

    import pickle
    from redis.asyncio import Redis
    from app.config import config
    
    class RedisCache:
        def __init__(self):
            self.redis = Redis(
                host=config.REDIS_HOST,
                port=config.REDIS_PORT,
            )
    
        async def ping(self) -> None:
            await self.redis.ping()
    
        async def close(self) -> None:
            await self.redis.close()
        
        async def set(self, key: str, value: object, ttl: int = None) -> None:
            await self.redis.set(key, pickle.dumps(value), ex=ttl)
        
        async def get(self, key: str) -> object:
            value = await self.redis.get(key)
            return pickle.loads(value)
    
        async def exists(self, key: str) -> bool:
            return await self.redis.exists(key)
    
    def key_builder(*args) -> str:
        """redis key 를 만들기 위한 함수"""
        return ":".join(map(str, args))
    
    redis_cache = RedisCache()
    
  • ttl 은 # 특정 시간 동안만 데이터가 저장되는 TTL(Time To Live) 기능을 의미한다.
  • 생성한 RedisCache 객체를 기반으로 Router 에 캐시 연산을 추가한다. 먼저 RedisCache 연결 확인 및 연결 종료를 위한 lifespan 코드를 수정할 수 있다.

    from contextlib import asynccontextmanager
    from fastapi import FastAPI
    
    from app.redis import redis_cache
    from app.db.session import ping_db, close_db
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):
        await ping_db()
        await redis_cache.ping()
    
        yield
    
        await close_db()
        await redis_cache.close()
    
  • 이제 GET 엔드포인트에 대해 Logger 및 RedisCache 코드를 추가한다. 아래의 예시 코드처럼 Redis 에 캐시할 Value 가 함수 및 인자에 따라 달라지므로, Key 에는 함수 이름 및 인자들을 포함시키면 좋다.

    from app.logger import logger
    from app.redis import redis_cache, key_builder
    from app.db.session import AsyncScopedSession
    
    @router.get("/")
    async def read_list():
        _key = key_builder("read_list")
    
        if await redis_cache.exists(_key):
            logger.debug("Cache hit")
            result = await redis_cache.get(_key)
        else:
            logger.debug("Cache miss")
            async with AsyncScopedSession() as session:
                stmt = select(...)
                result = (await session.execute(stmt)).scalars().all()
            if result:
                await redis_cache.set(_key, result, ttl=60)
    
        return HttpResponse(
            content=[
                DataResp(
                    dataId=data.data_id,
                    dataName=data.data_name,
                    createdAt=data.created_at,
                )
                for data in result
            ]
        )
    
  • 이제 이후 포스트에서는 지금까지 배운 FastAPI, 데이터 모델 정의와 DB 통신, 캐시 서버 통신 등을 활용하여 간단하게 구현해보자.
맨 위로 이동 ↑

댓글 남기기