[Airflow] Airflow 입문
Apach Airflow 는 현업에서 굉장히 많이 사용하게 된다. 특히 Batch Serving 을 사용할 때 많이 사용하게 되는데, 먼저 Airflow 에 대해 알고 넘어가자.
Batch Serving
- Batch Serving 은 일정 기간 동안 데이터를 수집한 후 일괄 학습 및 결과를 제공하는 방식이다. 대량의 데이터 처리할 때 효율적이다.
- Continuous Training 할 때도 사용할 수 있. 또한 일정 주기, Batch 묶음 단위로 서빙할 떄도 사용할 수 있다.
- Batch Serving 은 결국 예측 코드를 주기적으로 실행하여 예측 결과를 제공하는 것이다. 이 때 Job 스케줄러는 Apache Airflow 를 사용한다.
- 학습과 예측을 별도로 설정하여 다른 기간에 수행할 수 있다. 이 때 Jupyter Noetbook/Lab 을 이용하여 절차형으로 작성한 코드를 스크립트(
.py
)로 정리한 후 주기적으로 실행한다. - 스포티파이의 예측 알고리즘이나 Discover Weekly 가 이러한 방식으로 실행된다.
- DB 나 데이터 웨어하우스에 존재하는 데이터를 SQL Query 로 추출한 후 모든 데이터 일괄로 예측한다.
- 데이터가 이미지 일 때는, AWS S3(저장소) 등에 저장된 이미지를 사용하여 예측한다. 이미지는 DB 에 바로 저장되지 않고, object storage 같은 곳에 저장한다. 그리고 DB 에는 이미지의 저장 위치(S3 주소)를 기록한다. SQL Query 를 통해 새로 추가된 이미지의 주소를 확인하고 object storage 접근하여 예측한다.
- 자연어 데이터는 DB 나 데이터 웨어하우스에 저장하고 이를 활용해서 사용한다. 이를 전처리해서 object storage 에 저장할 수도 있다.
- Batch Serving 은 간단하게 함수 단위가 있으면 이를 주기적으로 실행하는 것이라 생각할 수 있다. NLP, CV, Recsys 등 다양한 task 모두 사용이 가능하다.
Apach Airflow
- Batch Serving 을 사용할 때 Apach Airflow 를 많이 사용한다.
- Batch Processing 이란 소프트웨어 프로그램을 자동으로 실행하는 방법이다. 예약된 시간에 자동으로 실행하고, 일회성 또는 주기적인 실행이 가능하다.
- Batch Processing vs. Batch Serving
- Batch Processing 은 일정 기간 동안 일괄적으로 작업을 수행하는 것으로 처리에 중점을 둔다.
- 반면 Batch Serving 은 서빙에 초점을 맞춰, 일정 기간 동안 일괄적으로 머신러닝 예측 작업을 수행하는 것으로, 전달에 중점을 둔다.
- Batch Processing 이 더 큰 개념이며, 이렇게 Batch 로 진행하는 작업에 Airflow 를 사용할 수 있다.
- Airflow 는 Batch 작업들 대부분이 사용하기 때문에, 데이터 엔지니어링, MLOps, Model Serving, 데이터 분석 등에서 많이 사용하는 프레임워크다.
Crontab
- Airflow 등장 전에는 Linux 의 Crontab 을 활용했다.
-
아래와 같이
MyProject
폴더가 구성되어 있다고 해보자. - 이제 서버에서
crontab -e
를 입력한다. 이후 실행된 에디터에서0 **** predict.py
를 입력한다. 0 ****
은 매 시 0 분을 의미한다. 따라서 OS 에 의해 매 시 0 분에predict.py
가 실행된다.- Linux 는 일반적인 서버 환경이면서 Crontab 이 기본적으로 설치되어 있어 매우 간편하다.
- Batch Processing 에 Linux crontab 을 사용하는 곳도 아직 있다. Batch Processing 을 간단하게 설계할 때 선택할 수 있다.
- Batch Processing 의 스케줄링을 정의한 표현식인 Cron 표현식에 대해 알아보자.
- Airflow 를 포함한 다른 도구들에서도 일반적으로 사용하는 표현식이다.
-
아래와 같은 Cron 표현식 중 대표적인 것들의 의미를 알 수 있을 정도로 인지하면 좋다.
*****
은 Every minute 를 뜻한다.0****
은 Every hour 를 뜻한다.00***
은 Every day at 12:00 AM 을 뜻한다.00**FRI
는 금요일 12:00 AM 을 뜻한다.001**
은 매달 1일 12:00 AM 을 뜻한다.
- 그러나 Linux Crontab 은 아래와 같은 문제가 있다.
- 재실행 및 알림이 잘 되지 않는다.
- 파일 실행 중 오류가 발생하면 Crontab 이 별도로 처리하지 않고, log 를 만들어 저장하고 끝난다. 별도의 알람을 구현할 수는 있지만 번거로움이 있다.
- 실패할 경우 자동으로 몇 번 더 재실행하고, 그래도 실패하면 알림을 받아야 대응할 수 있는데, 그런 게 없다.
- 과거 실행 이력 및 실행 로그를 보기 어렵다. 만들어진 로그 파일들을 모두 뒤져야 하는데,
vim
을 써서 찾는 게 익숙하면 괜찮다. - 복잡한 파이프라인을 만들기 힘들다. 전처리 → 예측 → 이후 작업과 같이 이어서 파이프라인을 만들고 싶다면
predict.py
를 복잡하게 짜는 수 밖에 없다.
-
Crontab 은 간단히 사용할 때는 좋지만, 재실행, 로그 확인 등은 제공하지 않아 정교한 스케줄링 및 워크플로우 도구가 필요하다. 위와 같은 문제들을 해결하기 위해서 여러 도구들이 나왔다. 즉 아래와 같은 다양한 스케줄링/워크플로우 전용 도구가 등장했다.
- 위 도구들 중 가장 많이 사용되는 것이 Airflow 다.
Airflow
- Airflow 는 스케줄링 및 워크플로우 도구의 표준이다. 데이터 엔지니어링이나 데이터 사이언스를 한다면, Airflow 를 매우 많이 사용하게 된다.
- Airbnb 에서 개발했으며 2점대 버전이 존재한다. 또한 업데이트가 매우 빠르다.
- 스케줄링 도구로 무거울 수 있지만, 거의 모든 기능을 제공하고 확장성이 좋아 일반적으로 스케줄링과 데이터 파이프라인, 엔지니어링에서 많이 사용한다. 따라서 데이터 엔지니어링 팀, MLOps 팀에서 많이 사용한다.
-
하루에 한번씩 무언가를 확인하는 것도 Airflow 로 가능하다.
- Airflow 를 많이 사용하는 이유는 아래와 같다.
- Workflow 를 잘 관리해주는 도구다.
- 코드로 작성된 데이터 파이프라인의 흐름을 스케줄링하고 모니터링 하는 목적으로 사용한다.
- 데이터 처리 파이프라인을 효율적으로 관리하여 시간과 자원을 절약하도록 한다.
- Airflow 주요 기능
- Python 기반 스케줄링 및 파이프라인을 작성할 수 있다.
with DAG( 'tutorial', defualt_args = default_args, description = 'A simple tutorial DAG', schedual_interval = timedelta(days=1), start_date = datetime(2021, 1, 1), catchup = False, tags = ['example'], ) as dag: # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id = 'print_date', bash_command = 'date' ) t2 = BashOperator( task_id = 'sleep', depends_on_past = False, bash_command = 'sleep 5', retries = 3 )
- 위와 같이
DAG
로 정의한다. BashOperator
를 쓰면bash
를 실행해준다. 이는 bash command 에서 직접 쓰는 것이라기 보다는 operator 관점으로 추상화 하는 것이다.-
또한 스케줄링 및 파이프라인 목록을 볼 수 있는 웹 UI 를 제공해준다.
- 위 그림과 같이 owner, 얼마나 실행했는지, 스케줄, 마지막 실행 날짜, 이름, 검색 기능을 모두 사용할 수 있다.
- 또한 어떤 것들이 정상 실행되었고 실패했는지를 파악할 수 있다.
- 위 그림과 같이 DAG 를 하나 클릭하면 해당 DAG 의 스케줄링 및 파이프라인을 구체적으로 볼 수 있다.
- 성공 여부, 처음 시작, 마지막 시작, 시도 횟수, 시간 등을 여러 view 로 볼 수 있다. 굉장히 가독성이 뛰어나다.
- 그래프 view 로 보면 위와 같이 표현된다. 의존성에 대한 것도 화살표로 확인할 수 있다.
-
Branch 를 사용하여 특정 조건에 따라 작업을 분기할 수 있다.
- 위 그림과 같이 하나의 작업이 실행되고, 이후 branch 작업(
branching
)으로 조건을 걸어서 분기를 나눌 수 있다. - 예를 들어 머신러닝 모델을 학습하고 → 결과가 기존 모델보다 성능 좋으면 update, 안 좋으면 pass 할 때 branch 기능을 사용할 수 있다.
- 위 그림과 같이 하나의 작업이 실행되고, 이후 branch 작업(
Airflow 핵심 개념
- DAGs(Directed Acylic Graphs)
- Airflow 에서 작업을 정의하는 방법이다. 작업의 흐름과 순서를 정의한다.
- Operator
- Airflow 의 작업 유형을 나타내는 클래스다.
BashOperator
,PythonOperator
,SQLOperator
등 다양한 Operator 가 있다. - 예를 들어 Python 을 통해 어떤 기능을 구현할 때, 사용하는 것들을 추상화해서
PythonOperator
에 인자로 건네주면 된다.
- Airflow 의 작업 유형을 나타내는 클래스다.
- Scheduler
- Airflow 의 핵심 구성 요소 중 하나다. DAGs 를 대상으로 현재 실행해야 하는지 스케줄을 확인한다.
- 웹 서버나 DB 에 시간표가 저장되어 있는데, 스케줄러가 해당 시간표를 계속 보면서 실행 시점을 파악한다.
- 정리하면, DAGs 의 실행을 관리하고 스케줄링 하는 기능을 한다.
- Executor
- 실제로 작업이 실행되는 환경을 의미한다.
- 스케줄러는 일을 명령하고, Worker(Executor) 들이 일을 하는 것이다.
LocalExecutor
,CeleryExecutor
등 다양한 Executor 가 존재한다.
Airflow 기본 아키텍처
-
DAG Directory
- 이 내부에 DAGs 파일들을 저장한다.
- Airflow 는 DAG 디렉토리를 계속 파싱하면서 확인한다. 즉 스케줄러에 의해서
.py
파일을 확인하고, DAG 가 파싱되고 작업을 판단한다. - 항상 DAG 디렉토리에 DAG 파일을 저장해야 한다.
-
Operator
- DAG 는 Directed Acylic Graphs 의 준말로 순환하지 않는 방향이 존재하는 그래프를 의미한다. 즉 순환형이 아니라 한쪽으로 흐른다. 이는 자료구조에서도 많이 다루는 개념이다.
- 작업의 의존성이 있을 때 Operator 의 순서를 정의하면 된다. Airflow 에서는 Linux Crontab 처럼 단순히 하나의 파일을 실행하는 것이 아닌, 여러 작업의 조합이 가능하다.
- DAG 1 개는 1 개의 파이프라인이다. 이 파이프라인 안에 여러 task 가 있을 수 있다. Task 는 DAG 내에서 실행할 작업이다. 하나의 DAG 는 여러 Task 의 조합으로 구성된다. 이러한 Task 를 만들 때 Operator 를 사용하게 된다.
- 즉, Operator 로 Task 를 만들고, Task 를 이어주고 Task 들의 묶음이 DAG 인 것이다.
-
아래의 예시를 보자.
- 위 그림과 같이 작업을 분기할 수도, 합칠 수도 있다. 또한 병렬 처리도 가능하다.
댓글 남기기