카테고리 없음

[Airflow] Task Group

sol-hee 2023. 1. 4. 17:35

Airflow DAG 중 한 TASK에 for문으로 테이블들을 Redshift에 적재하는 로직이 있었다.
그런데, 운영하다보니 다음과 같은 문제가 생겼다.


  1. 한 테이블에서 Error 발생할 경우 해당 테이블 이후 loop는 실행되지 않는 문제
  2. 테이블마다 커스텀하게 전처리를 할 수 없는 문제

1번의 경우 exception 처리로도 핸들링이 가능하고, 2번의 경우 if 문으로 어느정도 문제를 해결할 수 있다.
그런데, 그럴 경우 코드가 exception 정도는 괜찮지만 if 가 덕지덕지 붙어 코드가 지저분해지는 문제가 발생한다 ㅠ ( 만약 커스텀할 테이블이 100개라면? ㅗㅜㅑ.. )
다 무시하고 exception 처리를 하더라도, table 마다 task가 분할되어있지 않아서 멱등성을 보장해주기 어렵고, 실패시 해당 부분만 재실행하면 되는데 응 몰라~ 다실행할거야~ 하는 문제가 발생할 것으로 예상했다.

" 그래서? 그래서 어떻게 할건데? "

테이블 별로 TASK를 나누어 병렬처리를 하자 !! 라고 마음 먹었고, 마침 사수분께서 TaskGroup을 사용하시는 것을 보고 같이 활용해보기로 하였다 ( 나는 새싹 🌱 )

Task group

여러 개의 Task를 묶어 마치 하나의 Task처럼 볼 수 있다.

Syntax
아래 코드는 group_id 에 여러 task를 묶어 main_task와 연결한다.

main_task = Operator()
task_gruop = []
with TaskGroup(group_id='<group_id>') as tg:
    sub_task_1 = Operator()
    sub_task_2 = Operator()
    sub_task_1 >> sub_task_2
    task_group.append(tg)
main_task >> task_group

TEST

DummyOperator를 활용해서 TaskGroup을 테스트 

 

TaskGroup이 적용된 모습
그룹을 클릭하여 세부 태스크 확인 가능

참고

- https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/example_dags/example_task_group_decorator.html#task_start

 

airflow.example_dags.example_task_group_decorator — Airflow Documentation

 

airflow.apache.org

- Source code for airflow.example_dags.example_task_group_decorator

 

airflow.example_dags.example_task_group_decorator — Airflow Documentation

 

airflow.apache.org

- airflow.example_dags.example_task_group

 

airflow.example_dags.example_task_group — Airflow Documentation

 

airflow.apache.org