[Airflow] Airflow 를 활용한 Batch Serving (1)


Batch Serving 에 Airflow 를 활용해보자. 실무에서 실제 서비스에 적용할 기회가 분명 있을텐데, 그 전에 코드 단에서 그 흐름에 익숙해지는 것이 목표다. 이 포스트에서 Airflow 설치부터 DAG 를 생성해보자.

Airflow 설치 및 초기 설정

  • 아래와 같은 쉡 스크립트로 Airflow 를 설치할 수 있다.

    pip3 install pip --upgrade
    
    AIRFLOW_VERSION=2.6.3
    PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
    CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
    
    pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
    
  • Airflow 와 python 버전을 매치해서 설치해야 오류가 나지 않는다.
  • 참고로 PYTHON_VERSION 에서 pipe | 는 앞의 명령어의 출력 결과를 다음 명령어의 입력으로 넘겨주는 것이고, cut -d "기준" -f N-d 에 구분자(delimiter)를 설정하고 -f 에 필드 번호를 지정하여 추출한다.
    • echo "Python 3.9.18" | cut -d " " -f 2 | cut -d "." -f 1-2 를 입력하면 3.9 가 나온다.
  • 이후 Airflow 에서 사용할 DB 를 아래와 같이 초기화 한다.

    Untitled

    • 위 그림에서는 현재 폴더(pwd)를 Airflow 의 홈으로 만들기 위해 export AIRFLOW_HOME=$(pwd) 명령어 실행한다. 위에서는 pwd 를 문자열이 아니라 backtick 으로 감쌌다.
    • export 로 현재 폴더를 지정해주지 않으면 ~/airflow 에서 실행된다. 그러나 내가 원하는 프로젝트 폴더에서 airflow 를 실행하고 싶다면 위와 같은 방법을 따라야 한다.
    • 이후 airflow db init 으로 DB 를 초기화 한다.
  • 그러면 아래와 같이 홈으로 설정한 폴더 아래 airflow.cfgairflow.db 파일이 생긴다.

    Untitled

  • airflow.cfg 에서 Airflow 의 각종 설정이 존재한다. load_examples = True 는 예제 파일이 생성되도록 하는 설정으로, 현업에서는 False 로 한다.

    Untitled

  • 이제 Airflow 에서 사용할 Admin 계정을 생성한다. 설정한 Airflow 홈 폴더에서 아래의 명령어를 입력하면 된다.

    airflow users create \
    --username admin \
    --password '!admin-password!' \
    --firstname b \
    --lastname kkhyunn \
    --role Admin \
    --email toiquen419@gmail.com
    
  • Airflow Webserver 띄운다. airflow webserver --port 8080 와 같이 띄울 수 있으며, 포트가 제한된 환경이라면 포트를 열어줘야 한다. 아래와 같이 나오면 정상적으로 실행되고 있는 것이다.

    Untitled

  • Booting worker pid 는 Worker 들이 떴다는 것을 의미한다. 이제 localhost:8080 으로 접속하면 다음처럼 웹 UI 가 보이고 앞서 생성한 admin 계정으로 로그인할 수 있다.

    Untitled

  • 로그인을 하게 되면 노란색 창으로 스케줄러가 실행 중이지 않다는 에러가 보이게 된다. 또한 SQLite, SequentialExecutor 관련 오류도 보이는데 우선은 넘어가도 좋다.
  • 이제 새로운 터미널 창을 띄워서 Airflow Scheduler 를 다음처럼 실행시킨다.
    • Webserver 를 실행한 상황에서 Scheduler 를 또 실행하는 것으로, export 로 현재 폴더를 건네주고 실행한다.
    export AIRFLOW_HOME = pwd
    airflow scheduler
    
  • Airflow Webserver 와 Scheduler 를 실행할 때 포트 이슈가 있을 수 있다. 아래의 예제는 8793 포트가 사용되고 있다는 의미다.
    • 이를 해결하기 위해서 해당 포트의 process id 를 찾아 kill 해준다. kill $(lsof -t -i:8793) 으로 하면 된다. 그렇게 되면 스케줄러 관련 에러가 없어진다.

      Untitled

  • 정리하면, 항상 Webserver 와 Scheduler 는 같이 실행해야 한다. 따라서 2 개의 터미널이 띄워져 있어야 한다.

환경 설정 요약

  • Airflow 설치
    • pip install "apache-airflow==2.6.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.9.txt" 와 같이 설치한다.
  • Airflow 의 기본 디렉토리 설정
    • 환경변수 AIRFLOW_HOME 에 사용할 기본 디렉토리 경로를 설정한다.
    • export AIRFLOW_HOME=$(pwd) 처럼 export 를 해줘야 한다.
  • Airflow DB 초기화
    • Airflow 에서 사용할 DB 를 초기화 한다. airflow db init
  • Airflow Admin 계정 생성
    • airflow users create
  • Airflow WebServer 실행
    • airflow webserver --port 8080
  • Airflow Scheduler 실행
    • airflow scheduler

Airflow Hello World

  • 이제 DAG 를 만들고 Airflow 를 실행해보자.
  • 새 터미널을 띄우고 AIRFLOW_HOME 경로에서 DAG 를 저장할 디렉토리를 생성한다. 이름은 무조건 dags 다.
    • mkdir dags
  • dags 폴더에서 hello_world.py 를 생성한다.
  • Airflow 의 DAG 파일은 크게 3 가지 파트로 나뉜다.
    • DAG 정의
    • Task 정의
      • Task 에서 사용할 함수가 있다면 정의한다. 이 때 PythonOperator 와 같은 Operator 를 사용한다.
    • Task 순서 정의(연결)
  • 예시 코드는 아래와 같다.

    from datetime import datetime, timedelta
    
    from airflow import DAG
    from airflow.utils.dates import days_ago
    from airflow.operators.bash import BashOperator
    from airflow.operators.python import PythonOperator
    
    def print_world() -> None:
        print("world")
    
    # with 구문으로 DAG 정의
    with DAG(
        dag_id = "hello_world",              # DAG 의 식별자용 id
        description = "My first DAG",        # DAG 에 대한 설명
        start_date = days_ago(2),            # DAG 정의 기준 2일 전부터 시작
        schedule_interval = "0 6 * * *",     # 매일 06:00 에 실행
        tags = ["my_dags"]
    ) as dag:
          
        # Task 정의
        # bash 커맨드로 echo hello 실행
        t1 = BashOperator(
            task_id = "print_hello",          
            bash_command = "echo Hello",
            owner = "bkkhyunn",                # 이 작업의 오너. 보통 작업을 담당하는 사람의 이름을 넣는다.
            retries = 3,                       # 이 task 가 실패한 경우, 3 번의 재시도를 한다.
            retry_delay = timedelta(minutes=5) # 재시도하는 시간 간격은 5 분이다.
        )
    
        # Task 정의
        # Python 함수인 print_world 를 실행
        t2 = PythonOperator(
            task_id = "print_world",
            python_callable = print_world,
            depends_on_past = True,
            owner = "bkkhyunn",
            retries = 3,
            retry_delay = timedelta(minutes=5)
        )
    
        # Task 순서 정의
        # t1 실행 후 t2 실행
        t1 >> t2
    
    • 위 예제 코드에서 DAG 를 정의하는 부분이 with DAG 구문이고, Task 정의 부분이 t1t2 를 정의한 부분이며, Task 순서 정의가 맨 마지막 t1 >> t2 다.

DAG 정의

  • DAG 정의 부분에서는 이름, 태그, 언제부터 스케줄링을 시작할 지, 스케줄링 간격은 어떻게 할 지 등을 정의한다. 즉 DAG 의 메타정보를 기입한다.
  • with DAG() as dag 와 같이 with 구문을 사용한다.
    • 식별자용 아이디 dag_id 는 유니크해야 한다.
    • start_date 는 airflow 의 util 인 days_ago 뿐 아니라 2025-01-01 와 같이 입력해도 된다.
    • schedule_interval 에는 크론 표현식을 이용한다. 크론 표현식에 대해서는 해당 포스트에서 정리했다.
    • tags 는 이 DAG 에 대한 검색을 위해서 달아놓는다.
  • 실무에서 Airflow 를 사용할 때 중요한 인자는 아래와 같다.
    • catchup
      • 따라잡는다는 의미 그대로, 이미 지나간 일자의 DAG 를 실행할지 결정하는 옵션이다.
      • catchup 을 True 로 두면, DAG 에서 정의한 start_date 부터 현재까지 미실행된 모든 스케줄에 대해 DAG 를 실행한다.
      • 즉 해당 옵션은 과거의 데이터를 처리할 필요가 있을 때 유용하다.
      • 만약 catchup 을 False 로 두면, DAG 에서 정의한 start_date 와 상관없이 앞으로 실행될 DAG 를 실행한다.
      • 예를 들어, start_date2025-01-01 이고, DAG 의 스케줄이 매일 하루에 한 번 실행된다고 해보자.
      • 만약 해당 DAG 를 정의하고 동작하는 시점이 2025-01-10 일 때, catchup 이 True 라면 2025-01-01 부터 2025-01-10 이 실행되고, False 라면 2025-01-10 부터 실행된다.
    • depends_on_past
      • 특정 Task 가 이전 DAG 의 실행 결과에 의존할 지 여부를 결정하는 옵션이다.
      • 이전 Task 와 상관없이 작업을 수행하려면 해당 옵션을 False 로 두면 되고, 하루 단위의 작업들이 의존성이 있다면 True 를 주고 순차적으로 실행하면 된다.
      • 마찬가지로 start_date2025-01-01 이고, DAG 의 스케줄이 매일 하루에 한 번 실행되며, 해당 DAG 를 정의하고 동작하는 시점이 2025-01-10 라고 해보자.
      • 해당 옵션이 True 일 때, 이전 DAG 가 성공으로 완료되어야 이후의 DAG 가 실행되기 때문에 시간이 좀 더 걸린다. 즉 2025-01-01 이 완료되어야 2025-01-02 작업이 실시된다.
      • 반면에 해당 옵션이 False 라면, 이전 DAG 의 성공 여부와 상관없이 스케줄 시점이 되면 DAG 가 실행된다. 즉 2025-01-01 다음에 2025-01-05 가 임의로 실행될 수 있다.
      • 또한 Executor(Worker) 에 따라 여러가지 작업을 한번에 실행할 수도 있다.
      • depends_on_past 옵션을 False 로 주면 시간이 줄어들 수 있지만, Airflow 작업이 DB 와 연동이 되면 커넥션 풀이 많아지면서 장애가 날 수도 있다.
      • 따라서 처음에는 True 를 권장하고, 이후 DB 옵션을 고려하여 False 를 써야 한다.

Task 정의

  • Task 를 정의할 때는 Airflow 에 있는 다양한 Operator 클래스를 사용한다. Airflow 에는 다양한 Operator 클래스가 존재하여, 이미 추상화가 되어 있다.
  • 또한 Provider 라고 해서 AWS, 구글, Slack 과 같은 도구들과 연동할 수 있는 코드를 만들어서 제공하고 이다. 따라서 Airflow 를 사용한다면 로직에 집중하고, Operator 를 사용하는 것이 편리하다.
  • 이 때 중요한 것은 Operator 마다 인자가 다르다는 것이다.
    • BashOperatorbash_command 를 사용한다.
    • PythonOperatorpython_callable argument 에 실행할 python 함수를 전달한다. 만약 python 함수에 인자를 전달하려면, op_args arguemnt 에 전달하면 된다.
    • op_argspython_callable 에 전달한 python 함수에 필요한 인자를 list 형태로 넣어주고, op_kargs 는 dictionary 형태로 넣어준다.

Task 순서 정의

  • DAG 내 task 순서를 정의할 때, 순서는 >> 로 표현한다. 위 예제에서는 t1(BashOperator) 실행 후 t2(PythonOperator)를 실행한다.
  • task 순서를 정의할 때는 아래와 같다.
    • A task 후에 B task 가 실행되어야 한다면 A >> B 와 같이 입력한다.
    • A task 후에 B, C task 가 실행되어야 한다면 A >> [B, C] 와 같이 입력한다.
    • 특정 조건이 True 일 때만 실행하는 것도 가능하다. 이 때는 BranchPythonOperator 를 사용한다.

Airflow Web UI

  • Airflow example 은 airflow.cfg 에서 load_examples 옵션을 통해 없앨 수 있다. 기본적으로 True 로 설정되어 있다.
  • 아래 그림과 같이 왼쪽에 위치한 토글을 누르면, Airflow DAG 가 정의 시에 설정한 start_date 를 기준으로 스케줄된 DAG 들을 실행하는 과정을 on 하는 것이다.

    Untitled

  • 이것도 default mode 로 바꿀 수 있다. 새로운 DAG 가 정의되면 해당 DAG 가 처음부터 on 상태이도록 설정하는 옵션이 airflow 설정에 있다.
  • 토글을 클릭해서 DAG 상태를 on 으로 만들면 아래 그림과 같이 우측 Runs 에 연두색 불이 들어오기 시작한다. 만약 빨간색 불이 들어오면 실패한 것이다.

    Untitled

  • DAG 의 이름을 클릭하면 아래와 같이 실행된 결과를 확인할 수 있다.
    • 좌측의 세로 한 줄이 하나의 DAG 실행을 의미한다.
    • DAG 안에 2 개의 task 가 존재하고, 처음 정의된 task 부터 보인다.
    • DAG 스케줄링 시작 날짜를 2 일 전(days_ago(2))으로 설정했기 때문에, 2 개의 DAG Run 이 생성된다.
    • 스케줄링에 따라, 다음날 오전 6시(UTC 기준)가 지나면, 하나의 DAG Run 이 또 생기게 된다.

      Untitled

  • Airflow 서버는 일반적으로 항상 켜둔다. 그리고 DAG Run 이 실행될 시간이 되면 스케줄러가 executor 를 실행시킨다.
  • 아래와 같이 Graph, Calendar View 로도 볼 수 있다.

    Untitled

    Untitled

  • 또한 초록색 바 하단의 네모칸이 Task(Task Box) 를 의미한다. 이를 클릭하고 Code 를 누르면 작성한 코드를 볼 수 있고, Logs 를 클릭하면 아래와 같이 로그를 확인할 수 있다. 디버깅을 할 때 이 로그를 많이 활용하게 된다.

    Untitled

  • Task 재실행를 재실행할 때는 Task Box 를 누르고, “Clear Task” 버튼을 클릭한다.
    • 이는 Task 를 비어있는 상태로 만드는 것이다. 그렇게 되면 스케줄러가 해당 Task 를 보고 비어있다고 판단 후 실행하게 된다.
    • “Clear Task” 버튼을 누르면 아래와 같이 확인 창이 뜨고 영향을 받는 Task 가 있음을 언급한다.

      Untitled

    • 예제 코드에서는 print_hello 가 선행 task 이므로, 이를 “Clear Task” 를 눌러서 재실행하게 되면 후행 task 인 print_world 도 재실행하게 된다.
    • 이처럼 Task 가 여러 개 있을 때, 제일 선행하는 Task 를 재실행하면 다 재실행되기 때문에 순서를 고려해야 한다.
    • Upstream 은 상위 작업, Downstream 은 이후 작업을 의미한다.
  • 재실행을 하게 되면 아래와 같이 회색 Task Box 가 나오게 되는데, 이는 해당 task 가 queued 상태에 있음을 의미한다.

    Untitled

정리

  • AIRFLOW_HOME 으로 지정된 디렉토리에 dags 디렉토리를 생성하고 이 안에 DAG 파일을 작성한다.
  • DAG 는 Python 파일로 작성하고, 보통 하나의 .py 파일에 하나의 DAG 를 저장한다.
  • DAG 파일은 크게 다음으로 구성된다.
    • DAG 정의
    • Task 정의
    • Task 순서 정의
  • DAG 파일을 저장하면, Airflow 웹 UI 에서 확인할 수 있다.
  • Airflow 웹 UI 에서 해당 DAG 의 토글을 눌러 ON 상태로 변경하면, 해당 DAG 가 스케줄링되어 실행된다.
  • 이후 DAG 를 클릭하면 세부 페이지에서 실행된 DAG Run 의 결과를 볼 수 있다.

Airflow Operator

  • 앞서 Airflow 에는 다양한 Operator 가 제공되고 이미 추상화되어 있다고 언급했다. 아래에서 대표적인 Operator 들에 대해 알아보자.
  • Operator 는 Task 를 정의할 때 사용하게 되며, 어떤 인자가 들어가는지 확인하고 인자를 주입하면 된다. 블럭을 쌓는 것과 비슷하다.
  • 길이가 길어질 수 있지만, 아래와 같이 Operator 마다 맞는 인자를 넣으면 실행된다.

    Untitled

PythonOperator

  • Python 함수를 실행하는 Operator 다.
  • 함수 뿐 아니라 Callable 한 객체를 파라미터로 넘겨 실행할 수 있다.
  • 실행할 Python 로직을 함수로 생성한 후, PythonOperator 로 task 를 정의하여 활용할 수 있다.

    Untitled

BashOperator

  • Bash 커맨드를 실행하는 Operator 다.
  • 실행해야 할 프로세스가 Python 이 아니고 shell script, scala 파일 등이라면 BashOperator 로 task 를 정의하여 실행할 수 있다.
  • 또한 Python 도 실행할 수도 있다.

    Untitled

DummyOperator

  • 아무것도 실행하지 않는 Operator 다.

    Untitled

  • DAG 내에서 task 를 구성할 때, 여러 개의 Task 의 success 를 기다려야 하는 복잡한 task 구성에서 사용한다.
  • 예를 들어 DummyOperator 를 사용하여, status 가 success 일 때까지 기다리는 방식으로 활용한다. 아래 예시를 보자.

    Untitled

  • 위와 같이 E 는 작업을 모아두고 F 로 넘기기 위해서 DummyOperator 로 정의된다.

SimpleHttpOperator

  • 특정 호스트로 HTTP 요청을 보내고 Response 를 반환하는 Operator 다.
  • Python 함수에서 Requests 모듈을 사용한 뒤, PythonOperator 로 실행시켜도 무방하다. 다만 이런 기능이 Airflow Operator 에 이미 존재하는 것을 알면 좋다.

    Untitled

Provider Packages

  • AWS, GCP 등의 클라우드 기능을 추상화한 Operator 도 존재한다. 즉 다양한 기능을 Airflow 에서 사용할 수 있도록 추상화된 Operator 를 제공한다.

    Untitled

  • 위와 같이 외부 Thrid Party 와 연동해 사용하는 Operator 의 경우, Airflow 설치 시 extra package 를 설치해주면(pip install "apache-airflow[aws]") 관련된 기능을 쓸 수 있다.
  • 예를 들어 slack 알람을 구현한다면 slack provider 가 필요하다.

BranchPythonOperator

  • 특정 상황에 A 작업을 실행하고 그게 아니면 Pass 하는 등 특정 조건에 따라 실행을 제어하는 Operator 다.
  • 예를 들어, 모델을 학습한 결과 Accuracy 가 기존 모델보다 높으면 해당 모델을 저장 후 모델 업데이트를 실시하고, 좋지 않으면 저장만 진행하는 식이다.
  • 또한 BranchPythonOperator 를 사용하면 특정 일자 전에는 A 모델, 그 이후엔 B 모델을 사용하는 방식으로 분기 처리가 가능하다.

    Untitled

Airflow 추가 기능

  • Operator 외에도 알아두면 좋은 Airflow DAG 를 더 풍부하게 작성할 수 있는 개념이 있다.
  • Variable
    • Airflow Console 에서 변수(Variable)를 저장하여 Airflow DAG 에서 활용할 수 있다. 즉 Airflow 에서 사용되는 전역 변수(Global Variable)이다.
  • Connection & Hooks
    • MySQL, GCP 등 외부 도구와 연결하기 위한 설정이 있다.
  • Sensor
    • 외부 이벤트를 기다리며 특정 조건이 만족하면 실행하도록 할 수 있다. Airflow Task 를 실행하기 전에 조건으로 정의하여 사용하는 일종의 Operator 다.
    • 즉 Sensor 를 통해 .py 파일이 있는지를 확인하고, 해당 .py 에 작성된 함수를 실행하는 PythonOperator 의 task 를 실행할 수 있다.
    • 이러한 Sensor 에 대해서는 해당 블로그 글을 참고하면 잘 이해할 수 있다.
  • XComs
    • Task 끼리 결과를 주고 받을 때 사용한다.
    • Task 끼리 return 해서 쓰는 것이 아니라, XComs 에 데이터를 저장해서 사용한다. 즉 XComs 에 push / pull 하는 것이다.
    • 이러한 XComs 는 작업 간 데이터 공유에 사용되며, 민감한 정보의 경우 암호화를 적용하여 보안을 강화할 수 있다.
  • Jinja Template
    • Python 의 템플릿 문법이다. FastAPI 에서도 사용한다.
맨 위로 이동 ↑

댓글 남기기