[Airflow] Airflow 를 활용한 Batch Serving (1)
Batch Serving 에 Airflow 를 활용해보자. 실무에서 실제 서비스에 적용할 기회가 분명 있을텐데, 그 전에 코드 단에서 그 흐름에 익숙해지는 것이 목표다. 이 포스트에서 Airflow 설치부터 DAG 를 생성해보자.
Airflow 설치 및 초기 설정
-
아래와 같은 쉡 스크립트로 Airflow 를 설치할 수 있다.
pip3 install pip --upgrade AIRFLOW_VERSION=2.6.3 PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)" CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
- Airflow 와 python 버전을 매치해서 설치해야 오류가 나지 않는다.
- 참고로
PYTHON_VERSION
에서 pipe|
는 앞의 명령어의 출력 결과를 다음 명령어의 입력으로 넘겨주는 것이고,cut -d "기준" -f N
은-d
에 구분자(delimiter)를 설정하고-f
에 필드 번호를 지정하여 추출한다.echo "Python 3.9.18" | cut -d " " -f 2 | cut -d "." -f 1-2
를 입력하면3.9
가 나온다.
-
이후 Airflow 에서 사용할 DB 를 아래와 같이 초기화 한다.
- 위 그림에서는 현재 폴더(
pwd
)를 Airflow 의 홈으로 만들기 위해export AIRFLOW_HOME=$(pwd)
명령어 실행한다. 위에서는pwd
를 문자열이 아니라 backtick 으로 감쌌다. export
로 현재 폴더를 지정해주지 않으면~/airflow
에서 실행된다. 그러나 내가 원하는 프로젝트 폴더에서 airflow 를 실행하고 싶다면 위와 같은 방법을 따라야 한다.- 이후
airflow db init
으로 DB 를 초기화 한다.
- 위 그림에서는 현재 폴더(
-
그러면 아래와 같이 홈으로 설정한 폴더 아래
airflow.cfg
와airflow.db
파일이 생긴다. -
airflow.cfg
에서 Airflow 의 각종 설정이 존재한다.load_examples = True
는 예제 파일이 생성되도록 하는 설정으로, 현업에서는False
로 한다. -
이제 Airflow 에서 사용할 Admin 계정을 생성한다. 설정한 Airflow 홈 폴더에서 아래의 명령어를 입력하면 된다.
airflow users create \ --username admin \ --password '!admin-password!' \ --firstname b \ --lastname kkhyunn \ --role Admin \ --email toiquen419@gmail.com
-
Airflow Webserver 띄운다.
airflow webserver --port 8080
와 같이 띄울 수 있으며, 포트가 제한된 환경이라면 포트를 열어줘야 한다. 아래와 같이 나오면 정상적으로 실행되고 있는 것이다. -
Booting worker pid
는 Worker 들이 떴다는 것을 의미한다. 이제localhost:8080
으로 접속하면 다음처럼 웹 UI 가 보이고 앞서 생성한 admin 계정으로 로그인할 수 있다. - 로그인을 하게 되면 노란색 창으로 스케줄러가 실행 중이지 않다는 에러가 보이게 된다. 또한 SQLite, SequentialExecutor 관련 오류도 보이는데 우선은 넘어가도 좋다.
- 이제 새로운 터미널 창을 띄워서 Airflow Scheduler 를 다음처럼 실행시킨다.
- Webserver 를 실행한 상황에서 Scheduler 를 또 실행하는 것으로,
export
로 현재 폴더를 건네주고 실행한다.
export AIRFLOW_HOME = pwd airflow scheduler
- Webserver 를 실행한 상황에서 Scheduler 를 또 실행하는 것으로,
- Airflow Webserver 와 Scheduler 를 실행할 때 포트 이슈가 있을 수 있다. 아래의 예제는 8793 포트가 사용되고 있다는 의미다.
-
이를 해결하기 위해서 해당 포트의 process id 를 찾아
kill
해준다.kill $(lsof -t -i:8793)
으로 하면 된다. 그렇게 되면 스케줄러 관련 에러가 없어진다.
-
- 정리하면, 항상 Webserver 와 Scheduler 는 같이 실행해야 한다. 따라서 2 개의 터미널이 띄워져 있어야 한다.
환경 설정 요약
- Airflow 설치
pip install "apache-airflow==2.6.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.9.txt"
와 같이 설치한다.
- Airflow 의 기본 디렉토리 설정
- 환경변수
AIRFLOW_HOME
에 사용할 기본 디렉토리 경로를 설정한다. export AIRFLOW_HOME=$(pwd)
처럼export
를 해줘야 한다.
- 환경변수
- Airflow DB 초기화
- Airflow 에서 사용할 DB 를 초기화 한다.
airflow db init
- Airflow 에서 사용할 DB 를 초기화 한다.
- Airflow Admin 계정 생성
airflow users create
- Airflow WebServer 실행
airflow webserver --port 8080
- Airflow Scheduler 실행
airflow scheduler
Airflow Hello World
- 이제 DAG 를 만들고 Airflow 를 실행해보자.
- 새 터미널을 띄우고
AIRFLOW_HOME
경로에서 DAG 를 저장할 디렉토리를 생성한다. 이름은 무조건 dags 다.mkdir dags
- dags 폴더에서
hello_world.py
를 생성한다. - Airflow 의 DAG 파일은 크게 3 가지 파트로 나뉜다.
- DAG 정의
- Task 정의
- Task 에서 사용할 함수가 있다면 정의한다. 이 때
PythonOperator
와 같은 Operator 를 사용한다.
- Task 에서 사용할 함수가 있다면 정의한다. 이 때
- Task 순서 정의(연결)
-
예시 코드는 아래와 같다.
from datetime import datetime, timedelta from airflow import DAG from airflow.utils.dates import days_ago from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator def print_world() -> None: print("world") # with 구문으로 DAG 정의 with DAG( dag_id = "hello_world", # DAG 의 식별자용 id description = "My first DAG", # DAG 에 대한 설명 start_date = days_ago(2), # DAG 정의 기준 2일 전부터 시작 schedule_interval = "0 6 * * *", # 매일 06:00 에 실행 tags = ["my_dags"] ) as dag: # Task 정의 # bash 커맨드로 echo hello 실행 t1 = BashOperator( task_id = "print_hello", bash_command = "echo Hello", owner = "bkkhyunn", # 이 작업의 오너. 보통 작업을 담당하는 사람의 이름을 넣는다. retries = 3, # 이 task 가 실패한 경우, 3 번의 재시도를 한다. retry_delay = timedelta(minutes=5) # 재시도하는 시간 간격은 5 분이다. ) # Task 정의 # Python 함수인 print_world 를 실행 t2 = PythonOperator( task_id = "print_world", python_callable = print_world, depends_on_past = True, owner = "bkkhyunn", retries = 3, retry_delay = timedelta(minutes=5) ) # Task 순서 정의 # t1 실행 후 t2 실행 t1 >> t2
- 위 예제 코드에서 DAG 를 정의하는 부분이
with DAG
구문이고, Task 정의 부분이t1
과t2
를 정의한 부분이며, Task 순서 정의가 맨 마지막t1 >> t2
다.
- 위 예제 코드에서 DAG 를 정의하는 부분이
DAG 정의
- DAG 정의 부분에서는 이름, 태그, 언제부터 스케줄링을 시작할 지, 스케줄링 간격은 어떻게 할 지 등을 정의한다. 즉 DAG 의 메타정보를 기입한다.
with DAG() as dag
와 같이 with 구문을 사용한다.- 식별자용 아이디
dag_id
는 유니크해야 한다. start_date
는 airflow 의 util 인days_ago
뿐 아니라2025-01-01
와 같이 입력해도 된다.schedule_interval
에는 크론 표현식을 이용한다. 크론 표현식에 대해서는 해당 포스트에서 정리했다.tags
는 이 DAG 에 대한 검색을 위해서 달아놓는다.
- 식별자용 아이디
- 실무에서 Airflow 를 사용할 때 중요한 인자는 아래와 같다.
- catchup
- 따라잡는다는 의미 그대로, 이미 지나간 일자의 DAG 를 실행할지 결정하는 옵션이다.
- catchup 을 True 로 두면, DAG 에서 정의한
start_date
부터 현재까지 미실행된 모든 스케줄에 대해 DAG 를 실행한다. - 즉 해당 옵션은 과거의 데이터를 처리할 필요가 있을 때 유용하다.
- 만약 catchup 을 False 로 두면, DAG 에서 정의한
start_date
와 상관없이 앞으로 실행될 DAG 를 실행한다. - 예를 들어,
start_date
는2025-01-01
이고, DAG 의 스케줄이 매일 하루에 한 번 실행된다고 해보자. - 만약 해당 DAG 를 정의하고 동작하는 시점이
2025-01-10
일 때, catchup 이 True 라면2025-01-01
부터2025-01-10
이 실행되고, False 라면2025-01-10
부터 실행된다.
- depends_on_past
- 특정 Task 가 이전 DAG 의 실행 결과에 의존할 지 여부를 결정하는 옵션이다.
- 이전 Task 와 상관없이 작업을 수행하려면 해당 옵션을 False 로 두면 되고, 하루 단위의 작업들이 의존성이 있다면 True 를 주고 순차적으로 실행하면 된다.
- 마찬가지로
start_date
는2025-01-01
이고, DAG 의 스케줄이 매일 하루에 한 번 실행되며, 해당 DAG 를 정의하고 동작하는 시점이2025-01-10
라고 해보자. - 해당 옵션이 True 일 때, 이전 DAG 가 성공으로 완료되어야 이후의 DAG 가 실행되기 때문에 시간이 좀 더 걸린다. 즉
2025-01-01
이 완료되어야2025-01-02
작업이 실시된다. - 반면에 해당 옵션이 False 라면, 이전 DAG 의 성공 여부와 상관없이 스케줄 시점이 되면 DAG 가 실행된다. 즉
2025-01-01
다음에2025-01-05
가 임의로 실행될 수 있다. - 또한 Executor(Worker) 에 따라 여러가지 작업을 한번에 실행할 수도 있다.
depends_on_past
옵션을 False 로 주면 시간이 줄어들 수 있지만, Airflow 작업이 DB 와 연동이 되면 커넥션 풀이 많아지면서 장애가 날 수도 있다.- 따라서 처음에는 True 를 권장하고, 이후 DB 옵션을 고려하여 False 를 써야 한다.
- catchup
Task 정의
- Task 를 정의할 때는 Airflow 에 있는 다양한 Operator 클래스를 사용한다. Airflow 에는 다양한 Operator 클래스가 존재하여, 이미 추상화가 되어 있다.
- 또한 Provider 라고 해서 AWS, 구글, Slack 과 같은 도구들과 연동할 수 있는 코드를 만들어서 제공하고 이다. 따라서 Airflow 를 사용한다면 로직에 집중하고, Operator 를 사용하는 것이 편리하다.
- 이 때 중요한 것은 Operator 마다 인자가 다르다는 것이다.
BashOperator
는bash_command
를 사용한다.PythonOperator
는python_callable
argument 에 실행할 python 함수를 전달한다. 만약 python 함수에 인자를 전달하려면,op_args
arguemnt 에 전달하면 된다.op_args
는python_callable
에 전달한 python 함수에 필요한 인자를 list 형태로 넣어주고,op_kargs
는 dictionary 형태로 넣어준다.
Task 순서 정의
- DAG 내 task 순서를 정의할 때, 순서는
>>
로 표현한다. 위 예제에서는t1
(BashOperator
) 실행 후t2
(PythonOperator
)를 실행한다. - task 순서를 정의할 때는 아래와 같다.
- A task 후에 B task 가 실행되어야 한다면
A >> B
와 같이 입력한다. - A task 후에 B, C task 가 실행되어야 한다면
A >> [B, C]
와 같이 입력한다. - 특정 조건이 True 일 때만 실행하는 것도 가능하다. 이 때는
BranchPythonOperator
를 사용한다.
- A task 후에 B task 가 실행되어야 한다면
Airflow Web UI
- Airflow example 은
airflow.cfg
에서load_examples
옵션을 통해 없앨 수 있다. 기본적으로 True 로 설정되어 있다. -
아래 그림과 같이 왼쪽에 위치한 토글을 누르면, Airflow DAG 가 정의 시에 설정한
start_date
를 기준으로 스케줄된 DAG 들을 실행하는 과정을 on 하는 것이다. - 이것도 default mode 로 바꿀 수 있다. 새로운 DAG 가 정의되면 해당 DAG 가 처음부터 on 상태이도록 설정하는 옵션이 airflow 설정에 있다.
-
토글을 클릭해서 DAG 상태를 on 으로 만들면 아래 그림과 같이 우측 Runs 에 연두색 불이 들어오기 시작한다. 만약 빨간색 불이 들어오면 실패한 것이다.
- DAG 의 이름을 클릭하면 아래와 같이 실행된 결과를 확인할 수 있다.
- 좌측의 세로 한 줄이 하나의 DAG 실행을 의미한다.
- DAG 안에 2 개의 task 가 존재하고, 처음 정의된 task 부터 보인다.
- DAG 스케줄링 시작 날짜를 2 일 전(
days_ago(2)
)으로 설정했기 때문에, 2 개의 DAG Run 이 생성된다. -
스케줄링에 따라, 다음날 오전 6시(UTC 기준)가 지나면, 하나의 DAG Run 이 또 생기게 된다.
- Airflow 서버는 일반적으로 항상 켜둔다. 그리고 DAG Run 이 실행될 시간이 되면 스케줄러가 executor 를 실행시킨다.
-
아래와 같이 Graph, Calendar View 로도 볼 수 있다.
-
또한 초록색 바 하단의 네모칸이 Task(Task Box) 를 의미한다. 이를 클릭하고 Code 를 누르면 작성한 코드를 볼 수 있고, Logs 를 클릭하면 아래와 같이 로그를 확인할 수 있다. 디버깅을 할 때 이 로그를 많이 활용하게 된다.
- Task 재실행를 재실행할 때는 Task Box 를 누르고, “Clear Task” 버튼을 클릭한다.
- 이는 Task 를 비어있는 상태로 만드는 것이다. 그렇게 되면 스케줄러가 해당 Task 를 보고 비어있다고 판단 후 실행하게 된다.
-
“Clear Task” 버튼을 누르면 아래와 같이 확인 창이 뜨고 영향을 받는 Task 가 있음을 언급한다.
- 예제 코드에서는
print_hello
가 선행 task 이므로, 이를 “Clear Task” 를 눌러서 재실행하게 되면 후행 task 인print_world
도 재실행하게 된다. - 이처럼 Task 가 여러 개 있을 때, 제일 선행하는 Task 를 재실행하면 다 재실행되기 때문에 순서를 고려해야 한다.
- Upstream 은 상위 작업, Downstream 은 이후 작업을 의미한다.
-
재실행을 하게 되면 아래와 같이 회색 Task Box 가 나오게 되는데, 이는 해당 task 가 queued 상태에 있음을 의미한다.
정리
AIRFLOW_HOME
으로 지정된 디렉토리에 dags 디렉토리를 생성하고 이 안에 DAG 파일을 작성한다.- DAG 는 Python 파일로 작성하고, 보통 하나의
.py
파일에 하나의 DAG 를 저장한다. - DAG 파일은 크게 다음으로 구성된다.
- DAG 정의
- Task 정의
- Task 순서 정의
- DAG 파일을 저장하면, Airflow 웹 UI 에서 확인할 수 있다.
- Airflow 웹 UI 에서 해당 DAG 의 토글을 눌러 ON 상태로 변경하면, 해당 DAG 가 스케줄링되어 실행된다.
- 이후 DAG 를 클릭하면 세부 페이지에서 실행된 DAG Run 의 결과를 볼 수 있다.
Airflow Operator
- 앞서 Airflow 에는 다양한 Operator 가 제공되고 이미 추상화되어 있다고 언급했다. 아래에서 대표적인 Operator 들에 대해 알아보자.
- Operator 는 Task 를 정의할 때 사용하게 되며, 어떤 인자가 들어가는지 확인하고 인자를 주입하면 된다. 블럭을 쌓는 것과 비슷하다.
-
길이가 길어질 수 있지만, 아래와 같이 Operator 마다 맞는 인자를 넣으면 실행된다.
PythonOperator
- Python 함수를 실행하는 Operator 다.
- 함수 뿐 아니라 Callable 한 객체를 파라미터로 넘겨 실행할 수 있다.
-
실행할 Python 로직을 함수로 생성한 후,
PythonOperator
로 task 를 정의하여 활용할 수 있다.
BashOperator
- Bash 커맨드를 실행하는 Operator 다.
- 실행해야 할 프로세스가 Python 이 아니고 shell script, scala 파일 등이라면
BashOperator
로 task 를 정의하여 실행할 수 있다. -
또한 Python 도 실행할 수도 있다.
DummyOperator
-
아무것도 실행하지 않는 Operator 다.
- DAG 내에서 task 를 구성할 때, 여러 개의 Task 의 success 를 기다려야 하는 복잡한 task 구성에서 사용한다.
-
예를 들어
DummyOperator
를 사용하여, status 가 success 일 때까지 기다리는 방식으로 활용한다. 아래 예시를 보자. - 위와 같이 E 는 작업을 모아두고 F 로 넘기기 위해서
DummyOperator
로 정의된다.
SimpleHttpOperator
- 특정 호스트로 HTTP 요청을 보내고 Response 를 반환하는 Operator 다.
-
Python 함수에서
Requests
모듈을 사용한 뒤,PythonOperator
로 실행시켜도 무방하다. 다만 이런 기능이 Airflow Operator 에 이미 존재하는 것을 알면 좋다.
Provider Packages
-
AWS, GCP 등의 클라우드 기능을 추상화한 Operator 도 존재한다. 즉 다양한 기능을 Airflow 에서 사용할 수 있도록 추상화된 Operator 를 제공한다.
- 위와 같이 외부 Thrid Party 와 연동해 사용하는 Operator 의 경우, Airflow 설치 시 extra package 를 설치해주면(
pip install "apache-airflow[aws]"
) 관련된 기능을 쓸 수 있다. - 예를 들어 slack 알람을 구현한다면
slack provider
가 필요하다.
BranchPythonOperator
- 특정 상황에 A 작업을 실행하고 그게 아니면 Pass 하는 등 특정 조건에 따라 실행을 제어하는 Operator 다.
- 예를 들어, 모델을 학습한 결과 Accuracy 가 기존 모델보다 높으면 해당 모델을 저장 후 모델 업데이트를 실시하고, 좋지 않으면 저장만 진행하는 식이다.
-
또한
BranchPythonOperator
를 사용하면 특정 일자 전에는 A 모델, 그 이후엔 B 모델을 사용하는 방식으로 분기 처리가 가능하다.
Airflow 추가 기능
- Operator 외에도 알아두면 좋은 Airflow DAG 를 더 풍부하게 작성할 수 있는 개념이 있다.
- Variable
- Airflow Console 에서 변수(Variable)를 저장하여 Airflow DAG 에서 활용할 수 있다. 즉 Airflow 에서 사용되는 전역 변수(Global Variable)이다.
- Connection & Hooks
- MySQL, GCP 등 외부 도구와 연결하기 위한 설정이 있다.
- Sensor
- 외부 이벤트를 기다리며 특정 조건이 만족하면 실행하도록 할 수 있다. Airflow Task 를 실행하기 전에 조건으로 정의하여 사용하는 일종의 Operator 다.
- 즉 Sensor 를 통해
.py
파일이 있는지를 확인하고, 해당.py
에 작성된 함수를 실행하는PythonOperator
의 task 를 실행할 수 있다. - 이러한 Sensor 에 대해서는 해당 블로그 글을 참고하면 잘 이해할 수 있다.
- XComs
- Task 끼리 결과를 주고 받을 때 사용한다.
- Task 끼리 return 해서 쓰는 것이 아니라, XComs 에 데이터를 저장해서 사용한다. 즉 XComs 에 push / pull 하는 것이다.
- 이러한 XComs 는 작업 간 데이터 공유에 사용되며, 민감한 정보의 경우 암호화를 적용하여 보안을 강화할 수 있다.
- Jinja Template
- Python 의 템플릿 문법이다. FastAPI 에서도 사용한다.
댓글 남기기