Python

[Airflow] Dataset을 파헤쳐헤쳐모여여엿

sol-hee 2025. 4. 16. 22:29

와.. 정말정말 오랜만에 글 쓰는 것 같다.

우아한스터디도 했고.. 이직도 했고.. 벌써 일년 가까이 되어버렸다.

새 회사에 잘 적응했고, 엔지니어링 관련 늘 새로운 문제가 날 기다림 ㅇ0ㅇ 짜릿해. >ㅇ<

이 글은 현재 겪고 있는 문제를 Dataset으로 풀 수 없을까? 라는 고민에 의해 시작되었다. 풀 수 없을까를 고민하기 전에 Dataset을 완벽히 이해하고 있는지도 모르겠고 생각 차원으로 정리하고자한다.

이 글은 Airflow 공식 Docs인 https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html 를 기반으로 한다.


적기 전에 궁금한 거

  • Dataset을 or, and 조건으로 여러개 묶을 수 있다. and 조건인 경우 언제까지 데이터셋의 intersection이 유효하다고 판단하는가.
  • Dataset에 여러 정보를 붙일 수 있는 걸로 알고 있는데, 정보만 달라져도 새로운 데이터셋으로 처리하는지? 현재는 schedule에 Dataset을 하드코딩으로 박아놨음.
    • 부가 정보 붙은 놈들을 같은 놈으로 본다 -> partition_date를 부가정보에 포함시키면 안된다. Dataset 선언을 동적으로 해야하는지. (DataAlias써서?)
    • 다른 놈으로 본다 -> HAPPY 상황

[이전에 피봤던 내용]
DBT + Airflow 파이프라인을 사용하는데, DBT에서 select하는 모델들을 파싱 >> 참조하는 source 리스트를 데이터셋으로 모아서 사용

  • 이야 Dag 파싱할때마다 놀랍다 놀라와~~ ㅋㅋㅋㅋ Dag 파싱 타임마다 정신 못차리고 스케쥴러 터짐 ㅋㅋ ;;ㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋ

(얻은 교훈) 최대한 스케쥴러가 Dag 파싱하는데 영향을 주면 안된다.


Extra Information on dataset

The extra information does not affect a dataset’s identity. This means a DAG will be triggered by a dataset with an identical URI, even if the extra dict is different

with DAG(
    dag_id="consumer",
    schedule=[Dataset("s3://dataset/example.csv", extra={"different": "extras"})],
):
    ...

이런식으로 쓰는데.. 결국 extra와 상관없이 URI만으로 판별한다.

Multiple Datasets

Dag의 schedule 파라미터가 list 형식이므로 여러개의 데이터셋을 받을 수 있음.

  • 하나의 데이터 셋이 모든 데이터셋이 컨슘 되기 전에 여러번 업뎃 된다면? => 다운스트림 DAG는 한번만 돈다.

Attaching extra information to an emitting dataset event

extra와 다름. emit 시키기 전에 추가 정보를 넣을 수 있다.

  • extra: Extra information on a dataset statically describes the entity pointed to by the dataset URI
  • Metadata: extra information on the dataset event instead should be used to annotate the triggering data change, such as how many rows in the database are changed by the update, or the date range covered by it.

한마디로 추가 데이터를 정적으로 넣을거냐 / 동적으로 넣을거냐의 차이

from airflow.datasets import Dataset
from airflow.datasets.metadata import Metadata

example_s3_dataset = Dataset("s3://dataset/example.csv")


@task(outlets=[example_s3_dataset])
def write_to_s3():
    df = ...  # Get a Pandas DataFrame to write.
    # Write df to dataset...
    yield Metadata(example_s3_dataset, {"row_count": len(df)})

위처럼 접근하는게 간단해보임. 아래는 yield 안쓰고 컨텍스트에 직접 붙어서 추가하는 방법.

@task(outlets=[example_s3_dataset])
def write_to_s3(*, outlet_events):
    outlet_events[example_s3_dataset].extra = {"row_count": len(df)}

Sensor에서 쓴다면?

from airflow.sensors.dataset import DatasetSensor
from airflow.datasets import Dataset

example_dataset = Dataset("s3://dataset/example.csv")

class CustomDatasetSensor(DatasetSensor):
    def poke(self, context):
        events = context["dataset_events"]
        for event in events:
            if event.dataset.uri == "s3://dataset/example.csv":
                extra = event.extra
                if extra and extra.get("row_count", 0) > 1000:
                    self.log.info("Triggering because row_count > 1000")
                    return True
        return False

TriggerDagRunOperator에서 쓴다면?

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

def conditionally_trigger(context, dag_run_obj):
    metadata = context["dataset_events"][0].extra
    if metadata and metadata.get("row_count", 0) > 500:
        dag_run_obj.payload = {"row_count": metadata["row_count"]}
        return dag_run_obj

trigger = TriggerDagRunOperator(
    task_id="trigger_downstream",
    trigger_dag_id="dag_b",
    python_callable=conditionally_trigger
)

Fetching information from previously emitted dataset events

inlets을 사용해서 가져온다.
가져올 때 여러 다른 정보도 오는 것 같은데..

  • extra: yield Metadata(example_s3_dataset, {"row_count": len(df)}) 이렇게 흘려보냈던 정보
  • timestamp: task로 부터 emit된 시간정보
  • source_task_instance: 이 이벤트를 발생시킨 원래의 TaskInstance에 대한 참조 (즉, 어떤 DAG, 어떤 태스크에서, 어떤 실행 시간(execution_date)으로 발생했는지 추적 가능)
from airflow.decorators import task
from airflow.datasets import Dataset

example_s3_dataset = Dataset("s3://dataset/example.csv")

@task(inlets=[example_s3_dataset])
def post_process_s3_file(*, inlet_events):
    events = inlet_events[example_s3_dataset]  # 해당 Dataset에 대한 과거 이벤트 리스트
    last_event = events[-1]  # 가장 최근 이벤트
    last_row_count = last_event.extra["row_count"]  # 그 이벤트에 담긴 추가 정보 사용

    source_info = last_event.source_task_instance
    dag_id = source_info.dag_id
    task_id = source_info.task_id
    run_id = source_info.run_id

    print(f"이 데이터는 DAG '{dag_id}'의 태스크 '{task_id}'에서 생성됨 (run_id: {run_id})")

inlet_events의 특징

inlet_events[dataset]는 이전 이벤트들을 시간순으로 정렬한 리스트처럼 동작한다.

  • events[-1]: 가장 최근 이벤트
  • events[-2:]: 최근 2개의 이벤트 등 파이썬 리스트처럼 slicing 가능

지연 로딩(lazy loading) 방식입니다.
리스트를 호출하거나 접근할 때만 실제로 DB에서 데이터를 조회

예를 들어, 특정 S3에 적재된 데이터가 지난번보다 많이 늘어났을 때만 후처리를 하거나,
특정 태스크에서 생성한 데이터가 아닌 경우에는 처리하지 않도록 하고 싶을 때도
source_task_instance를 이용해 판단할 수 있습니다.

Dynamic data events emitting and dataset creation through DatasetAlias

데이터셋의 이벤트를 alias와 함께 emit 시킬 수 있음. 다운스트림은 처리된 데이터셋에 따라 달라질 수 있다.

인자로 name을 받으며 이 값은 Dataset을 고유하게 식별한다.

task는 alias를 outlet으로 먼저 선언하고 outlet_events 또는 yield Metadata로 이벤트를 추가한다.

아래 예제는 s3://bucket/my-task 데이터셋 이벤트를 extra 와 함께 만드는 것임.

Emit a dataset event during task execution through outlet_events

from airflow.datasets import DatasetAlias


@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task_with_outlet_events(*, outlet_events):
    outlet_events[DatasetAlias("my-task-outputs")].add(Dataset("s3://bucket/my-task"), extra={"k": "v"})

Emit a dataset event during task execution through yielding Metadata

from airflow.datasets.metadata import Metadata


@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task_with_metadata():
    s3_dataset = Dataset("s3://bucket/my-task")
    yield Metadata(s3_dataset, extra={"k": "v"}, alias="my-task-outputs")

Alias에 여러번 추가되거나 여러 Alias에 추가되어도 Dataset event는 한번만 emit 된다.

from airflow.datasets import DatasetAlias

@task(
    outlets=[
        DatasetAlias("my-task-outputs-1"),
        DatasetAlias("my-task-outputs-2"),
        DatasetAlias("my-task-outputs-3"),
    ]
)
def my_task_with_outlet_events(*, outlet_events):
    outlet_events[DatasetAlias("my-task-outputs-1")].add(
        Dataset("s3://bucket/my-task"), extra={"k": "v"}
    )
    # 이 줄은 위와 같은 Dataset + extra 이므로 추가 이벤트 발생 안 함
    outlet_events[DatasetAlias("my-task-outputs-2")].add(
        Dataset("s3://bucket/my-task"), extra={"k": "v"}
    )
    # extra 값이 다르기 때문에 별도 이벤트가 발생함
    outlet_events[DatasetAlias("my-task-outputs-3")].add(
        Dataset("s3://bucket/my-task"), extra={"k2": "v2"}
    )

Scheduling based on dataset aliases

Alias에 추가된 데이터셋 이벤트는 그냥 간단히 데이터셋 이벤트이므로, 실제 dataset에 의존하는 다운스트림 DAG 는 (별칭 고려안하고) 정상적으로 데이터를 읽을 수 있음. 다운스트림 DAG는 데이터셋 alias에도 의존한다.
DatasetAlias는 DAG 파싱 시점에 Dataset으로 해석(resolved)

DatasetAlias("out")을 outlet으로 가진 태스크가 실행 중에 하나 이상의 Dataset과 연결되면, 해당 DAG은 실행됩니다.
이때 연결되는 Dataset이 어떤 Dataset인지(Dataset URI나 정체)는 상관없습니다.

특정 태스크 실행(run)에서 해당 alias에 어떤 Dataset도 연결되지 않으면, downstream DAG은 트리거되지 않습니다.

with DAG(dag_id="dataset-producer"):

    @task(outlets=[Dataset("example-alias")])
    def produce_dataset_events():
        pass


with DAG(dag_id="dataset-alias-producer"):

    @task(outlets=[DatasetAlias("example-alias")])
    def produce_dataset_events(*, outlet_events):
        outlet_events[DatasetAlias("example-alias")].add(Dataset("s3://bucket/my-task"))


with DAG(dag_id="dataset-consumer", schedule=Dataset("s3://bucket/my-task")):
    ...

with DAG(dag_id="dataset-alias-consumer", schedule=DatasetAlias("example-alias")):
    ...

dataset-alias-producer이 실행되면 DatasetAlias("example-alias")Dataset("s3://bucket/my-task")로 해석(resolved) 됨.

with DAG(dag_id="dataset-alias-producer"):

    @task(outlets=[DatasetAlias("example-alias")])
    def produce_dataset_events(*, outlet_events):
        outlet_events[DatasetAlias("example-alias")].add(Dataset("s3://bucket/my-task"))

하지만 DAG dataset-alias-consumer는 스케줄이 갱신되기 위해 다음 DAG 파싱이 완료될 때까지 기다려야 함

# DAG 파싱될때까지 대기
with DAG(dag_id="dataset-alias-consumer", schedule=DatasetAlias("example-alias")):
    ...

이 문제를 해결하기 위해,
Airflow는 DatasetAlias("example-alias")에 의존하는 DAG들 중에서,
이전에 의존하지 않던 Dataset으로 alias가 해석된 경우, 해당 DAG들을 자동으로 다시 파싱(re-parse) 합니다.

-> Alias 해석된적 없던 Dataset이 들어온 경우 물려있는 DAG 들을 파싱하는 듯.

그 결과,

  • dataset-consumer DAG (직접 Dataset에 의존하는 DAG)와
  • dataset-alias-consumer DAG (alias에 의존하는 DAG)
    둘 다 dataset-alias-producer DAG 실행 이후에 트리거됩니다.

Fetching information from previously emitted dataset events through resolved dataset aliases

달라진것 없음. 똑같이 inlet_events 붙어서 읽음됨.

with DAG(dag_id="dataset-alias-producer"):

    @task(outlets=[DatasetAlias("example-alias")])
    def produce_dataset_events(*, outlet_events):
        outlet_events[DatasetAlias("example-alias")].add(
            Dataset("s3://bucket/my-task"), extra={"row_count": 1}
        )


with DAG(dag_id="dataset-alias-consumer", schedule=None):

    @task(inlets=[DatasetAlias("example-alias")])
    def consume_dataset_alias_events(*, inlet_events):
        events = inlet_events[DatasetAlias("example-alias")]
        last_row_count = events[-1].extra["row_count"]

Combining dataset and time-based schedules

DatasetOrTimeSchedule 이용해서 데이터셋으로도 업뎃하고.. 시간기반으로도 트리거할 수 있음.

from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable


@dag(
    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset)
    )
    # Additional arguments here, replace this comment with actual arguments
)
def example_dag():
    # DAG tasks go here
    pass

'Python' 카테고리의 다른 글

[Python] Memory allocate & Garbage Collection  (3) 2023.12.05
[Dask] 개념 및 Scheduler  (2) 2023.11.29
[Python] object  (0) 2022.04.13