[Airflow] Airflow 를 활용한 Batch Serving (2)


Airflow 는 지금까지 본 것처럼 Batch 파이프라인을 실행하는데 매우 유용하다. 이 포스트에서는 Airflow 를 활용할 때 좋은 팁과 slack 알람을 구현하는 사례를 보자.

Airflow 활용 팁

  • 먼저 airflow.cfgdag_dir_list_interval = 30030 으로 줄여준다.
  • 이는 DAG 디렉토리에 새로운 파일이 들어왔는지 스캔하는 주기인데, 너무 짧게 하면 DB 에 부하가 갈 수 있다.
  • 또 알아두면 좋은 것은 cron 표현식은 UTC 기준이기 때문에, 한국은 UTC 대비 9 시간 빠르다. 즉 한국이 밤 10시라면, 영국의 수도 런던은 오후 1 시가 된다.
    • KST 가 한국 시간이지만, 거의 모든 회사가 UTC 를 default 로 사용한다.

default_args

  • 아래와 같이 default_args 를 설정해놓고 많이 사용한다.

    default_args = {
        "owner" : "bkkhyunn",
        "depends_on_past" : False,   # 이전 DAG 의 task 성공 여부에 따라서 현재 task 를 실행할 지를 결정한다. False 라면 과거 task 의 성공 여부와 상관없이 실행한다.
        "start_date" : datetime(2025, 1, 1),
        "email_on_failure": False    # DAG 실행 중 오류가 발생하면 failure callback 으로 메일을 발송하므로 비활성화한다.
    }
    

@once 및 병렬 순서 정의

  • schedule_interval@once 를 주면 딱 한번만 실행한다. 이는 디버깅에서 유용하게 사용할 수 있다.

    dag = DAG(
        dag_id = "test",
        default_args = default_args,
        schedule_interval = "@once"
    )
    
  • task 순서 정의에서, 아래와 같이 task 를 병렬로 수행할 수 있다.

    task1 >> task2  # task 1 이 완료되면 task 2 실행
    task1 >> task3  # task 1 이 완료되면 task 3 실행
    
    # 이를 아래와 같이 한 번에 작성할 수 있다.
    # task 1 >> [task2, task3]
    

task 재실행

  • task 를 재실행할 때, Airflow 웹 UI 에서 Clear Task 버튼을 누르는 것 외에 아래와 같이 Trigger DAG 버튼으로도 가능하다.

    Untitled

    • 해당 버튼을 누르면 스케줄링이 아닌 manual 로 실행되는 것을 확인할 수 있다.

      Untitled

    • 이렇게 되면 다른 DAG 가 영향을 받을 수 있다. 따라서 Trigger DAG with config 버튼을 통해 config 를 같이 설정해야 한다. 이에 따라 처음에는 Clear Task 기능을 이용하는 것이 좋다.

  • 디버깅을 할 때 schedule_interval@once 를 줘도 되지만, default_args 에서 end_date 를 설정하는 방법도 있다.

    default_args = {
        "owner" : "bkkhyunn",
        "depends_on_past" : False,
        "start_date" : datetime(2024, 1, 1),
        "end_date" : datetime(2024, 1, 4)
    }
    

execution date

  • 또한 코드 작업을 할 때, datetime.now().date() 를 사용하기도 한다. 이를 통해 언제 실행하더라도 해당 코드가 실행되는 시간 기준으로 설정된다.
    • 그러나 Airflow 는 Batch 성으로 특정 시간대에 작업을 실행하는 도구다. 따라서 now 와 같은 임의 시간을 잘 쓰지 않고 의도한 시간이나 날짜를 주입해서 사용한다.
    • 이외에 과거 데이터를 마이그레이션하여 Airflow 작업을 과거 날짜로 실행해야 하는 경우도 존재한다.
  • 따라서 코드 상에서 now, SQL 상에서 current_date() 를 사용하지 않고 Airflow 에서 실행하기로 한 시간을 execution_date, logical_date 로 정의하여 넣어줘야 한다.
    • 이 때 연산을 여러 번 적용하더라도 결과가 달라지지 않는 성질인 멱등성을 보장해야 한다.
  • start_date 에 넣어줄 execution_date 를 맞추는 방법에 대해 알아보자.

    default_args = {
        "owner" : "bkkhyunn",
        "depends_on_past" : False,
        "start_date" : datetime(2024, 1, 1),
        "end_date" : datetime(2024, 1, 4)
    }
    
    def print_current_date_with_context(*args, **kwargs):
        print(f"kwargs: {kwargs}")
    
    with DAG(
        dag_id = "python_dag_with_context",
        default_args = default_args,
        schedule_interval = "30 0 * * *",   # UTC 기준으로 매일 0시 30분에 실행. 한국 시간으로 9시 30분.
        tags = ["my_dags"],
        catchup = True
    ) as dag:
    
        PythonOperator(
            task_id = "print_current_date_with_context",
            python_callable = print_current_date_with_context
        )
    
  • 위와 같이 코드를 작성하여 python 함수에 **kwargs 를 써서 실행시켜볼 수 있다.
  • 이를 통해 conf, dag, dag_run, queued_at, ds, ds_nodash 등 다양한 인자를 확인할 수 있다. 여기서 ds 는 실행하는 시점의 date 로, execution date 가 된다.
  • ds 를 가져오기 위해서 아래와 같이 python 함수를 작성할 수 있다.

    def print_current_date_with_context(*args, **kwargs):
        print(f"kwargs: {kwargs}")
        execution_date = kwargs['ds']
        execution_date_nodash = kwargs['ds_nodash']
        print(f"execution_date_nodash : {execution_date_nodash}")
    
        execution_date = datetime.strptime(execution_date, "%Y-%m-%d").date()
        date_kor = ["월", "화", "수", "목", "금", "토", "일"]
        datetime_weeknum = execution_date.weekday()
        print(f"{execution_date}{date_kor[datetime_weeknum]} 요일 입니다.")
    
  • 이와 같이 kwarg(keyword argument) 를 받아서 실행 시간을 맞춰줄 수 있다. 추가적으로 더 빠르게 하는 방법도 있는데, 다음 포스트에서 정리해보자.

Jinja Template

  • Airflow 는 아래와 같이 Jinja Template 과 잘 연동된다. 자세한 것은 Airflow 공식문서를 확인해보자.

    Untitled

  • 아래와 같이 Airflow 에서 Jinja Template 과 연동되도록 정의한 templated variable 를 넣으면, 실행 과정에서 의도한 값들로 변환된다.

    default_args = {
        "owner" : "bkkhyunn",
        "depends_on_past" : False,
        "start_date" : datetime(2024, 1, 1),
        "end_date" : datetime(2024, 1, 4)
    }
    
    def print_current_date_with_jinja(date):
        execution_date = datetime.strptime(date, "%Y-%m-%d").date()
        date_kor = ["월", "화", "수", "목", "금", "토", "일"]
        datetime_weeknum = execution_date.weekday()
        print(f"{execution_date}{date_kor[datetime_weeknum]} 요일 입니다.")
    
    with DAG(
        dag_id = "python_dag_with_jinja",
        default_args = default_args,
        schedule_interval = "30 0 * * *",   # UTC 기준으로 매일 0시 30분에 실행. 한국 시간으로 9시 30분.
        tags = ["my_dags"],
        catchup = True
    ) as dag:
          
        execution_date = "{{ ds }}"  # templated variable
    
        PythonOperator(
            task_id = "print_current_date_with_jinja",
            python_callable = print_current_date_with_jinja,
            op_kwargs = {"date" : execution_date}
        )
    
  • 위 DAG 를 실행하고 UI 의 task 에서 “Rendered Template” 를 누르면, 주입한 것들에 대해 확인할 수 있다.

    Untitled

XCom

  • 특정 task 에서 나온 결과를 다른 task 에 넘기고 싶을 때 Xcom 을 이용한다. Airflow 에서는 구조상 return 해서 task 의 결과를 받을 수 없기 때문에, Xcoms 로 저장해서 반환해준다.
  • Python 코드에서 return 을 하면 자동으로 XCom 에 저장해준다. 이를 push 라고 한다.
  • 구체적으로 아래와 같이 return_value 라는 key 로 return 값인 Value 를 저장한다.

    Untitled

  • 이 반환 결과를 다른 task 에서 쓰려면, XCom 에서 pull 해서 받아 쓸 수 있다.

    Untitled

  • 그러나 XCom 을 이용하여 큰 데이터를 저장하는 것은 권장되지 않는다. 큰 데이터는 DB 나 데이터 웨어하우스를 이용하고, 간단한 변수 이름 등은 Xcom 의 push 와 pull 을 이용해도 좋다.

Slack 알람 구현

  • Airflow 를 통해 Task 가 실패할 때 Slack 메시지를 전송하는 것을 구현해볼 수 있다. 여기에는 Connection 을 사용한다.
  • DAG RUN 실패를 캐치해서 Slack 으로 알람을 보내주는 것은 현업에 필요한 기능이다. 물론 성공에 대한 알람도 가능하다. 여기서는 실패만 알람으로 알려주도록 해보자.
    • 블로그가 Airflow 와 Slack 을 연동한 알람을 세세하게 잘 설명하고 있다.
  • 먼저 Airflow Slack Provider 를 설치한다. 반드시 Airflow 실행 폴더에서 진행해야 한다.

    pip3 install 'apache-airflow-providers-slack[http]'==8.6.0
    
  • 이후 아래와 같이 Slack API key 를 발급한다. Slack API Apps 에서 Create New App 을 클릭한다.

    Untitled

  • From scratch 에서 App Name 을 지정하고, 설치할 Slack Workspace 를 지정한다.

    Untitled

  • 이후 Incoming Webhooks 를 클릭하여 Active 한다. Add New Webhook to Workspace 를 누르면 메시지를 전송할 채널을 지정할 수 있고, 이후 URL 이 생성된다.

    Untitled

    Untitled

  • 생성된 URL 을 저장한다. 이 때, 해당 URL 에서 https://hooks.slack.com/services/ 뒤에 나오는 부분이 Password 다.
  • 이제 Airflow Web Server 의 Console 에서 아래를 입력해준다.
    • Connection ID : 사용자 지정. Airflow 코드에서 사용하게 된다.
    • Connection Type : HTTP
    • Host : https://hooks.slack.com/services
    • Password : https://hooks.slack.com/services/ 뒤 부분이다.
  • 아래와 같이 Web Server 를 통해 Admin $\rightarrow$ Connections $\rightarrow$ Add a new record 로 넘어가서 Connection 을 생성해준다.

    Untitled

  • Task 가 실패했을 때 알림 코드를 작성한다.
    • dags/utils 폴더를 생성한 후, __init__.py, slack_notifier.py 를 생성한다.
    • DAG 를 정의할 때는 해당 .py 파일들을 import 해서 사용한다.
    from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
    
    # Slack Webhook 제공 Operator 를 먼저 정의
    # 1. Connection ID
    SLACK_DAG_CONN_ID = "my_webhook"
    
    # 2. Webhook 함수 정의
    def send_message(slack_msg):
        return SlackWebhookOperator(
            task_id = "slack_webhook",
            slack_webhook_conn_id = SLACK_DAG_CONN_ID,
            message = slack_msg,
            username = "sample-airflow"
        )
    
    # 3. slack alert 함수 정의
    def task_fail_slack_alert(context):
        slack_msg = """
            :red_circle: Task Failed.
            *Task* : {task}
            *Dag* : `{dag}`
            *Execution Time* : {exec_date}
            """.format(
            task = context.get("task_instance").task_id,
            dag = context.get("task_instance").dag_id,
            exec_date = context.get("execution_date")
        )
    
        alert = send_message(slack_msg)
    
        return alert.execute(context=context)
    
    def task_succ_slack_alert(context):
        slack_msg = f"""
            :large_green_circle: Task SUCC.
            *Task* : {context.get("task_instance").task_id}
            *Dag* : {context.get("task_instance").dag_id}
            *Execution Time* : {context.get("execution_date")}
            """
          
        alert = send_message(slack_msg)
    
        return alert.execute(context=context)
    
    • 추가적으로 slack_msg 에 Slack ID 를 추가하여 특정 사용자를 tag 할 수도 있다.
  • 이제 DAG 코드와 연결한다.

    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from datetime import datetime
    
    from airflow.exceptions import AirflowFailException
    from utils.slack_notifier import task_fail_slack_alert, task_succ_slack_alert
    
    default_args = {
        "owner": "bkkhyunn",
        "depends_on_past": False,
        "start_date": datetime(2024, 1, 1),
        "end_date": datetime(2024, 1, 4)
    }
    
    def _handle_job_error() -> None:
        raise AirflowFailException("Raise Exception.")
    
    with DAG(
        dag_id="python_dag_with_slack_webhook",
        default_args=default_args,
        schedule_interval="30 0 * * * ",
        tags=["my_dags"],
        catchup=True,
        on_failure_callback=task_fail_slack_alert
    ) as dag:
          
        send_slack_noti = PythonOperator(
            task_id="raise_exception_and_send_slack_noti",
            python_callable=_handle_job_error
        )
    
        send_slack_noti
    
    • 위와 같이 DAG 정의 내에서 on_failure_callback 인자에 task_fail_slack_alert 함수를 넣어준다. 이는 task 가 실패할 때 해당 함수를 실행한다는 것이다.
    • 위 예제에서는 PythonOperator 에 Error 를 발생시키는 함수를 전달하여 slack 알람 연동이 잘 되는지를 확인한다. 아래와 같이 연동한 Workspace 에서 slack 메시지를 받을 수 있다.

      Untitled

    • Aiflow DAG 뿐 아니라 인프라, Docker Image, 등에서도 오류가 생길 수 있다. 오류 발생을 알림으로 받는 것은 모니터링을 위해 굉장히 중요하다.
맨 위로 이동 ↑

댓글 남기기