[Airflow] Airflow 를 활용한 Batch Serving (2)
Airflow 는 지금까지 본 것처럼 Batch 파이프라인을 실행하는데 매우 유용하다. 이 포스트에서는 Airflow 를 활용할 때 좋은 팁과 slack 알람을 구현하는 사례를 보자.
Airflow 활용 팁
- 먼저
airflow.cfg
의dag_dir_list_interval = 300
을30
으로 줄여준다. - 이는 DAG 디렉토리에 새로운 파일이 들어왔는지 스캔하는 주기인데, 너무 짧게 하면 DB 에 부하가 갈 수 있다.
- 또 알아두면 좋은 것은 cron 표현식은 UTC 기준이기 때문에, 한국은 UTC 대비 9 시간 빠르다. 즉 한국이 밤 10시라면, 영국의 수도 런던은 오후 1 시가 된다.
- KST 가 한국 시간이지만, 거의 모든 회사가 UTC 를 default 로 사용한다.
default_args
-
아래와 같이
default_args
를 설정해놓고 많이 사용한다.default_args = { "owner" : "bkkhyunn", "depends_on_past" : False, # 이전 DAG 의 task 성공 여부에 따라서 현재 task 를 실행할 지를 결정한다. False 라면 과거 task 의 성공 여부와 상관없이 실행한다. "start_date" : datetime(2025, 1, 1), "email_on_failure": False # DAG 실행 중 오류가 발생하면 failure callback 으로 메일을 발송하므로 비활성화한다. }
@once 및 병렬 순서 정의
-
schedule_interval
에@once
를 주면 딱 한번만 실행한다. 이는 디버깅에서 유용하게 사용할 수 있다.dag = DAG( dag_id = "test", default_args = default_args, schedule_interval = "@once" )
-
task 순서 정의에서, 아래와 같이 task 를 병렬로 수행할 수 있다.
task1 >> task2 # task 1 이 완료되면 task 2 실행 task1 >> task3 # task 1 이 완료되면 task 3 실행 # 이를 아래와 같이 한 번에 작성할 수 있다. # task 1 >> [task2, task3]
task 재실행
-
task 를 재실행할 때, Airflow 웹 UI 에서 Clear Task 버튼을 누르는 것 외에 아래와 같이 Trigger DAG 버튼으로도 가능하다.
-
해당 버튼을 누르면 스케줄링이 아닌 manual 로 실행되는 것을 확인할 수 있다.
-
이렇게 되면 다른 DAG 가 영향을 받을 수 있다. 따라서 Trigger DAG with config 버튼을 통해 config 를 같이 설정해야 한다. 이에 따라 처음에는 Clear Task 기능을 이용하는 것이 좋다.
-
-
디버깅을 할 때
schedule_interval
에@once
를 줘도 되지만,default_args
에서end_date
를 설정하는 방법도 있다.default_args = { "owner" : "bkkhyunn", "depends_on_past" : False, "start_date" : datetime(2024, 1, 1), "end_date" : datetime(2024, 1, 4) }
execution date
- 또한 코드 작업을 할 때,
datetime.now().date()
를 사용하기도 한다. 이를 통해 언제 실행하더라도 해당 코드가 실행되는 시간 기준으로 설정된다.- 그러나 Airflow 는 Batch 성으로 특정 시간대에 작업을 실행하는 도구다. 따라서
now
와 같은 임의 시간을 잘 쓰지 않고 의도한 시간이나 날짜를 주입해서 사용한다. - 이외에 과거 데이터를 마이그레이션하여 Airflow 작업을 과거 날짜로 실행해야 하는 경우도 존재한다.
- 그러나 Airflow 는 Batch 성으로 특정 시간대에 작업을 실행하는 도구다. 따라서
- 따라서 코드 상에서
now
, SQL 상에서current_date()
를 사용하지 않고 Airflow 에서 실행하기로 한 시간을execution_date
,logical_date
로 정의하여 넣어줘야 한다.- 이 때 연산을 여러 번 적용하더라도 결과가 달라지지 않는 성질인 멱등성을 보장해야 한다.
-
start_date
에 넣어줄execution_date
를 맞추는 방법에 대해 알아보자.default_args = { "owner" : "bkkhyunn", "depends_on_past" : False, "start_date" : datetime(2024, 1, 1), "end_date" : datetime(2024, 1, 4) } def print_current_date_with_context(*args, **kwargs): print(f"kwargs: {kwargs}") with DAG( dag_id = "python_dag_with_context", default_args = default_args, schedule_interval = "30 0 * * *", # UTC 기준으로 매일 0시 30분에 실행. 한국 시간으로 9시 30분. tags = ["my_dags"], catchup = True ) as dag: PythonOperator( task_id = "print_current_date_with_context", python_callable = print_current_date_with_context )
- 위와 같이 코드를 작성하여 python 함수에
**kwargs
를 써서 실행시켜볼 수 있다. - 이를 통해
conf, dag, dag_run, queued_at, ds, ds_nodash
등 다양한 인자를 확인할 수 있다. 여기서ds
는 실행하는 시점의 date 로,execution date
가 된다. -
ds
를 가져오기 위해서 아래와 같이 python 함수를 작성할 수 있다.def print_current_date_with_context(*args, **kwargs): print(f"kwargs: {kwargs}") execution_date = kwargs['ds'] execution_date_nodash = kwargs['ds_nodash'] print(f"execution_date_nodash : {execution_date_nodash}") execution_date = datetime.strptime(execution_date, "%Y-%m-%d").date() date_kor = ["월", "화", "수", "목", "금", "토", "일"] datetime_weeknum = execution_date.weekday() print(f"{execution_date} 는 {date_kor[datetime_weeknum]} 요일 입니다.")
- 이와 같이
kwarg
(keyword argument) 를 받아서 실행 시간을 맞춰줄 수 있다. 추가적으로 더 빠르게 하는 방법도 있는데, 다음 포스트에서 정리해보자.
Jinja Template
-
Airflow 는 아래와 같이 Jinja Template 과 잘 연동된다. 자세한 것은 Airflow 공식문서를 확인해보자.
-
아래와 같이 Airflow 에서 Jinja Template 과 연동되도록 정의한 templated variable 를 넣으면, 실행 과정에서 의도한 값들로 변환된다.
default_args = { "owner" : "bkkhyunn", "depends_on_past" : False, "start_date" : datetime(2024, 1, 1), "end_date" : datetime(2024, 1, 4) } def print_current_date_with_jinja(date): execution_date = datetime.strptime(date, "%Y-%m-%d").date() date_kor = ["월", "화", "수", "목", "금", "토", "일"] datetime_weeknum = execution_date.weekday() print(f"{execution_date} 는 {date_kor[datetime_weeknum]} 요일 입니다.") with DAG( dag_id = "python_dag_with_jinja", default_args = default_args, schedule_interval = "30 0 * * *", # UTC 기준으로 매일 0시 30분에 실행. 한국 시간으로 9시 30분. tags = ["my_dags"], catchup = True ) as dag: execution_date = "{{ ds }}" # templated variable PythonOperator( task_id = "print_current_date_with_jinja", python_callable = print_current_date_with_jinja, op_kwargs = {"date" : execution_date} )
-
위 DAG 를 실행하고 UI 의 task 에서 “Rendered Template” 를 누르면, 주입한 것들에 대해 확인할 수 있다.
XCom
- 특정 task 에서 나온 결과를 다른 task 에 넘기고 싶을 때 Xcom 을 이용한다. Airflow 에서는 구조상 return 해서 task 의 결과를 받을 수 없기 때문에, Xcoms 로 저장해서 반환해준다.
- Python 코드에서 return 을 하면 자동으로 XCom 에 저장해준다. 이를 push 라고 한다.
-
구체적으로 아래와 같이
return_value
라는 key 로 return 값인 Value 를 저장한다. -
이 반환 결과를 다른 task 에서 쓰려면, XCom 에서 pull 해서 받아 쓸 수 있다.
- 그러나 XCom 을 이용하여 큰 데이터를 저장하는 것은 권장되지 않는다. 큰 데이터는 DB 나 데이터 웨어하우스를 이용하고, 간단한 변수 이름 등은 Xcom 의 push 와 pull 을 이용해도 좋다.
Slack 알람 구현
- Airflow 를 통해 Task 가 실패할 때 Slack 메시지를 전송하는 것을 구현해볼 수 있다. 여기에는 Connection 을 사용한다.
- DAG RUN 실패를 캐치해서 Slack 으로 알람을 보내주는 것은 현업에 필요한 기능이다. 물론 성공에 대한 알람도 가능하다. 여기서는 실패만 알람으로 알려주도록 해보자.
- 이 블로그가 Airflow 와 Slack 을 연동한 알람을 세세하게 잘 설명하고 있다.
-
먼저 Airflow Slack Provider 를 설치한다. 반드시 Airflow 실행 폴더에서 진행해야 한다.
pip3 install 'apache-airflow-providers-slack[http]'==8.6.0
-
이후 아래와 같이 Slack API key 를 발급한다. Slack API Apps 에서 Create New App 을 클릭한다.
-
From scratch 에서 App Name 을 지정하고, 설치할 Slack Workspace 를 지정한다.
-
이후 Incoming Webhooks 를 클릭하여 Active 한다. Add New Webhook to Workspace 를 누르면 메시지를 전송할 채널을 지정할 수 있고, 이후 URL 이 생성된다.
- 생성된 URL 을 저장한다. 이 때, 해당 URL 에서
https://hooks.slack.com/services/
뒤에 나오는 부분이 Password 다. - 이제 Airflow Web Server 의 Console 에서 아래를 입력해준다.
- Connection ID : 사용자 지정. Airflow 코드에서 사용하게 된다.
- Connection Type : HTTP
- Host : https://hooks.slack.com/services
- Password :
https://hooks.slack.com/services/
뒤 부분이다.
-
아래와 같이 Web Server 를 통해 Admin $\rightarrow$ Connections $\rightarrow$ Add a new record 로 넘어가서 Connection 을 생성해준다.
- Task 가 실패했을 때 알림 코드를 작성한다.
-
dags/utils
폴더를 생성한 후,__init__.py
,slack_notifier.py
를 생성한다. - DAG 를 정의할 때는 해당
.py
파일들을 import 해서 사용한다.
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator # Slack Webhook 제공 Operator 를 먼저 정의 # 1. Connection ID SLACK_DAG_CONN_ID = "my_webhook" # 2. Webhook 함수 정의 def send_message(slack_msg): return SlackWebhookOperator( task_id = "slack_webhook", slack_webhook_conn_id = SLACK_DAG_CONN_ID, message = slack_msg, username = "sample-airflow" ) # 3. slack alert 함수 정의 def task_fail_slack_alert(context): slack_msg = """ :red_circle: Task Failed. *Task* : {task} *Dag* : `{dag}` *Execution Time* : {exec_date} """.format( task = context.get("task_instance").task_id, dag = context.get("task_instance").dag_id, exec_date = context.get("execution_date") ) alert = send_message(slack_msg) return alert.execute(context=context) def task_succ_slack_alert(context): slack_msg = f""" :large_green_circle: Task SUCC. *Task* : {context.get("task_instance").task_id} *Dag* : {context.get("task_instance").dag_id} *Execution Time* : {context.get("execution_date")} """ alert = send_message(slack_msg) return alert.execute(context=context)
- 추가적으로
slack_msg
에 Slack ID 를 추가하여 특정 사용자를 tag 할 수도 있다.
-
-
이제 DAG 코드와 연결한다.
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime from airflow.exceptions import AirflowFailException from utils.slack_notifier import task_fail_slack_alert, task_succ_slack_alert default_args = { "owner": "bkkhyunn", "depends_on_past": False, "start_date": datetime(2024, 1, 1), "end_date": datetime(2024, 1, 4) } def _handle_job_error() -> None: raise AirflowFailException("Raise Exception.") with DAG( dag_id="python_dag_with_slack_webhook", default_args=default_args, schedule_interval="30 0 * * * ", tags=["my_dags"], catchup=True, on_failure_callback=task_fail_slack_alert ) as dag: send_slack_noti = PythonOperator( task_id="raise_exception_and_send_slack_noti", python_callable=_handle_job_error ) send_slack_noti
- 위와 같이 DAG 정의 내에서
on_failure_callback
인자에task_fail_slack_alert
함수를 넣어준다. 이는 task 가 실패할 때 해당 함수를 실행한다는 것이다. -
위 예제에서는
PythonOperator
에 Error 를 발생시키는 함수를 전달하여 slack 알람 연동이 잘 되는지를 확인한다. 아래와 같이 연동한 Workspace 에서 slack 메시지를 받을 수 있다. - Aiflow DAG 뿐 아니라 인프라, Docker Image, 등에서도 오류가 생길 수 있다. 오류 발생을 알림으로 받는 것은 모니터링을 위해 굉장히 중요하다.
- 위와 같이 DAG 정의 내에서
댓글 남기기