티스토리 뷰

전통적인 Airflow 운영 방식에서는 특정 조건이 충족될 때까지 기다리는 'Sensor'를 주로 사용했다.

하지만 리소스 효율성 문제로 인해 Airflow 2.4부터는 Dataset 기반의 이벤트 트리거방식이 도입되었다.

이번 포스팅에서는 Sensor의 한계와 이를 극복하는 Dataset의 개념, 그리고 운영 관점의 에러 핸들링 전략을 정리한다.

 

1. Sensor의 한계와 Dataset의 등장 배경

Sensor 와 Dataset 한 문장 차이점

- Sensor: "나왔니? 아직? 그럼 1분 뒤에 또 물어볼게" (수동적/리소스 낭비)
- Dataset: (데이터가 업데이트되는 순간) "나왔다! 다음 작업 시작해!" (이벤트 기반/효율적)

1-1. Sensor의 전통적 역할

Sensor는 외부 시스템의 작업 완료를 대기하며 파이프라인의 연속성을 보장하는 역할을 수행한다.

  • S3 파일 존재 체크: 특정 파일이 업로드되었는지 확인하여 후속 작업을 트리거한다.
  • 특정 조건 충족 시까지 폴링(Polling): 설정된 조건이 충족될 때까지 주기적으로 상태를 확인한다.
  • DAG 간 의존성 관리: 서로 다른 DAG 간의 실행 순서와 의존 관계를 조정한다.
  • 외부 API 응답 대기: 외부 호출 결과가 올 때까지 대기한다.

1-2. Sensor 방식의 문제점

  • 리소스 낭비: Sensor는 실행 중 Worker 슬롯을 계속 점유한다.
    이는 제한된 Worker 리소스를 무의미하게 소모하는 결과를 초래한다.
  • 비용 증가: 클라우드 환경에서 장시간 대기하는 Sensor는 불필요한 비용을 발생시킨다.
  • 관리 복잡성: Timeout, 재시도 로직, 실패 처리 등 관련 설정이 복잡하여 디버깅이 어렵다.
  • 확장성 저하: Sensor가 슬롯을 다 차지해버리면 정작 중요한 다른 작업들이 돌지 못하는 동시성 문제가 생긴다.

1-3. 경량 대기 방식(Dataset)의 필요성

실무에서는 Sensor 사용을 줄이고, 리소스를 점유하지 않는 이벤트 기반의 경량 대기 방식이 필요한데,

그 대안이 바로 Dataset이다.

  • 리소스 효율성: Worker 리소스를 점유하지 않는 방식으로 대기 상태를 관리하여 시스템 확장성을 높인다.
  • Dataset 도입: Airflow 2.4에서 도입된 Dataset 기능은 데이터 중심의 트리거 메커니즘을 제공하여 불필요한 폴링을 제거한다.

2. Dataset의 개념과 구조

- 데이터 중심 트리거: 시간이 아니라 데이터가 업데이트될 때 작업을 시작한다.
- 이벤트 기반: S3에 데이터가 올라가는 '이벤트'가 발생하면, 이를 기다리던 다음 DAG들이 즉시 실행된다.

2-1. Dataset Concept (핵심 개념)

  • Data Update: 시간 스케줄이 아니라, 데이터가 업데이트될 때 트리거된다.
  • Data-Centric: 데이터 의존성을 통해 DAG들을 논리적으로 연결한다.
  • S3 Trigger: S3 데이터 업로드가 완료되면 이를 사용하는 Consumer DAG가 자동으로 실행된다.
  • Event-Driven: 데이터 파이프라인을 위한 완전한 이벤트 기반 아키텍처를 구현한다.

2-2. Dataset Structure (작동 흐름)

출처: metacode

 

데이터 처리의 선후 관계를 Producer와 Consumer로 정의한다.

  1. Producer: 데이터 처리를 완료한 후 Dataset을 생성하거나 업데이트하여 '업데이트됨' 상태를 표시한다.
  2. Dataset: Producer와 Consumer DAG 사이를 연결하는 매개체 역할을 하며 데이터 상태를 표현한다.
  3. Consumer: Dataset을 스케줄 트리거로 사용하며, 데이터가 업데이트되는 즉시 실행된다.
  4. Processing: Consumer DAG가 Producer가 생성한 데이터를 처리하며 사이클이 반복된다.
● 현실 세계 비유: "카페 진동벨" By Gemini
Dataset 구조는 카페에서 커피를 주문하고 받는 과정과 똑같습니다.

- Producer (점원): 커피를 다 만들면 진동벨을 누릅니다. (데이터 생성 완료 및 Dataset 업데이트)
- Dataset (진동벨): 점원과 손님 사이의 신호기입니다. 벨이 울리면 "준비됐다"는 상태가 됩니다.
- Consumer (손님): 벨이 울릴 때까지 자기 일을 하다가, 벨이 울리는 즉시 움직입니다. (DAG 트리거)
- Processing (커피 마시기): 손님이 커피를 가져가서 마십니다. (데이터 처리 시작)

2-3. 주요 활용 시나리오

  • ETL Layers: Raw → Silver → Gold 단계의 데이터 변환 연결.
  • S3 Folders: 각 S3 폴더를 하나의 Dataset으로 표현하여 관리.
  • Analytics: 소스 데이터 업데이트 시 분석 작업을 즉시 트리거.
  • Automation: 폴링 없이 완전히 자동화된 데이터 파이프라인 구축.

3. Dataset vs ExternalTaskSensor 비교

구분 Dataset ExternalTaskSensor(레거시)
트리거 방식 이벤트 기반 트리거링 Task 상태 기반 트리거링
리소스 사용 Worker 리소스를 사용하지 않음 Worker 슬롯을 점유함
의존성 중심 데이터 중심의 의존성 Task 중심의 의존성
확장성 DAG가 많아질수록 유리함 레거시 DAG 연결에 주로 적합함

 

권장 사용 방법

  • Dataset 권장: ETL 레이어 간 연결, S3 스토리지 기반 처리, 구조화된 파이프라인, 리소스 효율이 중요한 경우.
  • ExternalTaskSensor 권장: DAG 간 연결을 강제로 구성해야 하는 경우, Task 단위의 강한 의존성이 필요한 경우, 하위 호환성이 필요한 경우.

실무에서는 DAG 간 의존성을 관리할 때 두 방식 중 적합한 것을 선택해야 한다.


4. 운영 관점의 Error Handling 전략

안정적인 파이프라인 운영을 위해 Retry, Alert, SLA 로직 구성이 필수적이다.

 

4-1. Retry (재시도 전략)

  • 기본 원칙: 네트워크 오류나 레이트 리밋 등 일시적 오류를 자동 복구하는 핵심 메커니즘이다.
    운영자 개입 없이 파이프라인을 복구한다.
  • 권장 설정: retries=3(충분한 횟수), retry_delay=5m(외부 시스템 복구 시간 고려).
  • 주의사항: 작업의 멱등성(Idempotency) 보장 여부를 확인해야 하며, 무한 재시도를 방지하기 위한 최대 횟수 제한이 필수다.
    예시) 데이터 적재는 성공했는데 데이터 업데이트가 실패한 경우 계속 재시도 하면 적재가 계속되게 된다.

4-2. Alert (알림 전략)

  • Slack/이메일: 실시간 모니터링과 빠른 대응을 위해 팀 커뮤니케이션 도구와 연동한다.
  • Airflow Callback: on_failure_callback 함수를 활용해 사용자 정의 알림 로직을 구현한다.
  • 자동 감지 시스템: 실패 패턴을 자동 분류하여 반복적 오류에 대한 인사이트를 제공한다.

4-3. SLA (Service Level Agreement)

  • 정의: DAG나 Task가 특정 시간 내에 완료되어야 하는 시간 제약을 정의한다.
  • 설정: default_args에 sla 파라미터를 추가하고 timedelta 객체로 정의한다.
  • 효과: 파이프라인의 효율성을 모니터링하고 성능 저하를 조기에 감지한다.

5. Producer DAG 와 Consumer DAG 만들어보기

5-1. Producer DAG 생성

아래 코드가 Producer DAG 이다. 마찬가지로 주요 코드별로 설명을 적어보겠다.

 

간단히 해당 Producer 가 하는 일을 요약해보자면

  • mark_dataset_ready Task에 outlets=[Dataset(...)]를 선언
  • Producer가 성공적으로 끝나면 Dataset이 업데이트됨

전체 코드

from __future__ import annotations

from datetime import datetime
import json

from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.datasets import Dataset
from airflow.providers.amazon.aws.hooks.s3 import S3Hook


def get_cfg() -> dict:
    env = Variable.get("env", default_var="dev").strip()
    cfg = json.loads(Variable.get(f"clip_cfg_{env}"))

    for k in ("s3_bucket", "s3_prefix", "dataset_uri"):
        if not cfg.get(k):
            raise ValueError(f"Missing '{k}' in clip_cfg_{env}")

    return cfg


@dag(
    dag_id="dataset_producer_sales_raw_minimal",
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["datasets", "producer"],
)
def dataset_producer_sales_raw_minimal():
    cfg = get_cfg()
    dataset = Dataset(cfg["dataset_uri"])

    @task
    def upload(run_ds: str) -> str:
        # run_ds는 "{{ ds }}"로 들어오는 날짜 문자열 (YYYY-MM-DD)
        content = f"id,amount,dt\n1,100,{run_ds}\n2,200,{run_ds}\n"

        bucket = cfg["s3_bucket"]
        key = f"{cfg['s3_prefix']}/sales_{run_ds}.csv"

        S3Hook(aws_conn_id="aws_default").load_string(
            string_data=content,
            bucket_name=bucket,
            key=key,
            replace=True,
        )

        s3_path = f"s3://{bucket}/{key}"
        print(f"[upload] {s3_path}")
        return s3_path

    @task(outlets=[dataset])
    def notify_dataset(s3_path: str) -> None:
        print(f"[notify_dataset] Dataset updated: {cfg['dataset_uri']}")
        print(f"[notify_dataset] Produced file: {s3_path}")

    s3_path = upload("{{ ds }}")
    notify_dataset(s3_path)


dag = dataset_producer_sales_raw_minimal()

 

1) get_cfg 내 for문

for k in ("s3_bucket", "s3_prefix", "dataset_uri"):
    if not cfg.get(k):
        raise ValueError(f"Missing '{k}' in clip_cfg_{env}")
  • 일을 시작하기 전에 '진짜로 필요한 정보들이 다 있는지' 확인하는 로직이다.
  • s3_bucket, s3_prefix, dataset_uri(무전기 채널 번호 같은거임)가 있나?
  • 왜 이러한 로직을 넣었는가?
    • 만약 dataset_uri가 빠져있다면, Producer가 열심히 일을 다 해놓고도 누구에세 신호를 보내야할지 몰라서
      다음 DAG인 Consumer가 영원히 일을 할 수 없게되기 때문이다.

2) notify_dataset 태스크

여기가 바로 Dataset 의 가장 핵심인 '이벤트 방생'구간이다.

@task(outlets=[dataset]) # [핵심] 이 일이 끝나면 'dataset'이라는 신호등을 초록불로 바꿔라!
def notify_dataset(s3_path: str) -> None:
    print(f"[notify_dataset] Dataset updated: {cfg['dataset_uri']}")
  • outlets=[dataset]:
    • 이 옵션이 붙어있으면, 함수가 성공적으로 끝나는 순간 Airflow가 '이 데이터가 업데이트 되었다'고 시스템 전체에 공표한다.
    • 그러면 이 신호를 받고 비로소 Consumer DAG가 실행되게 된다.

3) 실행 부

# 1. 공장 설정을 가져오고 이름표(Dataset)를 만듭니다.
cfg = get_cfg()
dataset = Dataset(cfg["dataset_uri"])

# 2. 업로드 일꾼에게 오늘의 날짜("{{ ds }}")를 주며 일을 시킵니다.
s3_path = upload("{{ ds }}")

# 3. 업로드가 끝나면 신호 보내기 일꾼이 출동합니다.
notify_dataset(s3_path)
  • dataset = Dataset(cfg["dataset_uri"]):
    • cfg에서 "s3://sales/raw"와 같은 글자(주소)를 꺼낸다.
    • Dataset(...): 그 글자를 Airflow에게 주면서 '이 주소를 우리 공식 신호 채널로 등록해줘' 라고 선언하는 부분
    • dataset=: 이제 dataset이라는 변수는 단순 글자가 아니라, 이벤트를 발생시키고 감지할 수 있는 신호기가 된다.
  • {{ ds }}:
    • Airflow가 제공하는 기능으로, 오늘 날짜 (예:2026-04-11)을 자동으로 넣어준다.

4) 요약: 전체적인 플로우

 

  1. 매일 정해진 시간에 이 DAG가 깨어난다.
  2. S3 창고에 오늘 날짜가 붙은 CSV 파일을 저장한다.
  3. 작업이 끝나면 outlets 설정 덕분에 "신선한 데이터가 도착했습니다" 라는 이벤트가 발생한다.
  4. 이제 이 신호만 기다리며 리소스를 아끼고 있던 Consumer DAG가 즉시 잠에서 깨어나 데이터를 처리하러 간다.

 

5-2. Consumer DAG

이제는 Consumer DAG에 대해 보겠다.

 

해당 Consumer DAG에 대해 간단히 요약해보자면

  • schedule=[Dataset("...")] 만 있으면 됨
  • cron schedule 없음
  • Sensor 없음

 

전체 코드

from __future__ import annotations

from datetime import datetime, timedelta
import json

from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.datasets import Dataset
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.sdk import get_current_context  # Airflow 3 권장


def get_cfg() -> dict:
    env = Variable.get("env", default_var="dev").strip()
    return json.loads(Variable.get(f"clip_cfg_{env}"))


@dag(
    dag_id="dataset_consumer_sales_raw_minimal",
    start_date=datetime(2025, 1, 1),
    schedule=[Dataset(get_cfg()["dataset_uri"])],  # Dataset 이벤트로 트리거
    catchup=False,
    tags=["datasets", "consumer"],
)
def dataset_consumer_sales_raw_minimal():
    cfg = get_cfg()

    @task(
        execution_timeout=timedelta(minutes=20),
        retries=2,
        retry_delay=timedelta(minutes=5),
    )
    def consume() -> str:
        """
        Dataset 트리거에서는 {{ ds }} 가 없을 수 있으므로
        컨텍스트에서 날짜를 계산해서 파일명을 만든다.
        """
        ctx = get_current_context()

        # Asset-triggered run에서도 보통 logical_date는 존재함
        # (없으면 run_start 같은 값으로 fallback)
        logical_dt = ctx.get("logical_date") or ctx.get("data_interval_start") or datetime.utcnow()
        run_date = logical_dt.date().isoformat()  # 'YYYY-MM-DD'

        bucket = cfg["s3_bucket"]
        key = f"{cfg['s3_prefix']}/sales_{run_date}.csv"

        hook = S3Hook(aws_conn_id="aws_default")

        if not hook.check_for_key(key=key, bucket_name=bucket):
            raise FileNotFoundError(f"File not found: s3://{bucket}/{key}")

        content = hook.read_key(key=key, bucket_name=bucket) or ""
        print(f"[consume] logical_date={logical_dt} -> run_date={run_date}")
        print(f"[consume] s3://{bucket}/{key}")
        print("[consume] preview:\n" + content[:200])

        return f"consumed:s3://{bucket}/{key}"

    consume()


dag = dataset_consumer_sales_raw_minimal()

 

 

1) schedule=[Dataset(...)] 부분

이 부분이 Consumer DAG의 정체성을 결정하는 가장 중요한 줄이다.

  • 의미: 기존의 @daily나 0 9 * * * 같은 시간표들을 쓰지 않는다.
  • 원리: get_cfg()["dataset_uri"]에 적힌 dataset(무전기 채널)을 계속 모니터링한다.
  • 동작: Producer DAG가 outlets=[dataset]을 통해 신호를 쏘는 순간,
    시간에 상관없이 이 DAG가 자동으로 트리거 된다.

2) @task 파라미터들

아래와 같은 예외처리를 해두었는데,

이는 외부시스템(S3)은 일시적인 장애가 생길 수 있으므로, 바로 포기하지 않고 재시도를 하도록 설정한 것이다.

@task(
    execution_timeout=timedelta(minutes=20), # 20분 넘게 걸리면 "문제 있다"고 판단하고 종료 [cite: 149]
    retries=2,                             # 실패해도 딱 2번까지만 더 해봐 (Retry 전략) [cite: 137, 154, 158]
    retry_delay=timedelta(minutes=5),      # 다시 할 때는 5분 정도 쉬었다가 해 (네트워크 복구 대기) [cite: 158]
)

 

3) 날짜 계산 부분: get_current_context()

  • 문제: 시간표대로 도는 DAG들은 '오늘 날짜'가 명확하지만,
    Consumer DAG처럼 신호를 받고 갑자기 깨어난 DAG는 현재 시간이 정확하지 않을 수 있다.
  • 해결: 따라서 get_current_context()를 통해서 현재 실행중인 Context를 통째로 가져오고
  • 동작: logical_dt에서 날짜를 추출하여, Producer가 S3에 올렸을때 파일이름(예: salse_2026_04_11.csv)을 똑같이 유추해 낸다.

4) 전체 흐름 요약

전체 흐름을 볼때는 Producer와 연결해서 보는 것이 좋다.

  1. 대기: Consumer는 dataset_uri 채널을 맞춰놓고 조용히 리소스를 아끼며 대기한다.
  2. 신호: Producer가 S3 업로드를 마치고 완료 신호를 보낸다.
  3. 기상: 신호를 감지한 Consumer가 즉시 실행된다.
  4. 처리: Consumer 는 Producer가 만든 S3 파일을 읽어 비즈니스 로직(변환/적재)를 처리한다.
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/06   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함