티스토리 뷰

0. 강의 목차

1. Schedule / Catchup / Retry: 실행 주기 및 재시도 메커니즘
2. Backfill 개념 & 필요성: 과거 데이터 재처리의 이해
3. Catchup vs Backfill: 실무 운영 판단 기준
4. Catchup 활성화 실습: 과거 미실행 구간 자동 처리
5. CLI 기반 Backfill 실행: 명령어 기반 수동 재처리
6. Backfill 시 중복 데이터 문제: 멱등성(Idempotency) 설계의 중요성

1. Schedule / Catchup / Retry

1-1. cron 표현식 기본 개념

  • 개요: Airflow에서 DAG 실행 주기를 정의하는 표준 방식으로, 분, 시, 일, 월, 요일 단위를 사용한다.
  • 기본 구조: 분(0-59) 시간(0-23) 일(1-31) 월(1-12) 요일(0-6) 순서다.
  • 단축어: @daily(매일 자정), @hourly(매시간 시작) 등 간편한 설정을 지원한다.
  • 주의사항: 시간 설정은 UTC 기준이므로 한국 시간(KST) 적용 시 9시간 차이를 고려해야 한다.

1-2. Schedule과 실행 타이밍

  • schedule_interval: DAG가 실행되는 주기를 설정한다.
  • data interval: DAG가 처리해야 하는 데이터의 시간 구간을 의미한다.
  • logical date: 데이터 처리의 기준 날짜로, 실제 실행 시점과 다를 수 있다.
  • 실행 시점: 실제 실행은 설정된 schedule_interval이 끝난 이후에 발생한다.
이해하기 힘들어서 예를 들었다.

- Schedule Interval (실행 주기): 얼마나 자주 신문을 발행할지 정하는 주기입니다.
- Data Interval (데이터 구간): 신문 기사를 수집하는 시간입니다. 만약 @daily(매일) 스케줄이라면, 오늘 00:00부터 23:59까지가 데이터 구간입니다.
- Logical Date (논리적 날짜): 신문 1면에 찍힌 '발행 일자'입니다. 실제 배달은 내일 되더라도, 신문지에 적힌 날짜는 '오늘 날짜'인 것과 같습니다.
- 실행 시점: 데이터 구간이 끝난 직후에 실행됩니다. 오늘치 데이터를 다 모아야(오늘이 끝나야) 분석을 시작할 수 있기 때문입니다.

1-3. Retry 메커니즘

Retry는 같은 DAG안에서 실행한다. 따라서 네트워크 문제같은 케이스에 사용되게 된다.

반대로, Source Data 자체가 안들어왔다거나, SQL 로직의 이상, 파티션 키가 이상하거나 하는건 Retry로 해결하는 문제가 아니다.

  • retries: 실패 시 재시도 횟수를 지정하는 파라미터
  • retry_delay: 재시도 사이의 간격을 설정한다.
  • max_retry_delay: 최대 재시도 간격을 제한하는 파라미터
  • retry_exponential_backoff: 대기 시간을 점진적으로 증가시켜 시스템 부하를 조절한다.
  • execution_timeout: 태스크의 최대 실행 시간을 제한한다.
Retry와 Backfill의 차이
Retry는 실패 복구 메커니즘이며, Backfill은 과거 데이터 재처리 메커니즘
또한 Retry는 Task 단위고 Backfill은 DAG 레벨에서 동작하게 된다.

2. Backfill 개념 & 필요성

2-1. Backfill의 정의와 목적

  • 개념: 과거 특정 기간에 대해 DAG를 실행하거나 누락된 데이터를 처리하는 기능이다.
  • 목적: 운영 중 발생한 장애로 인한 데이터 복구, 로직 변경에 따른 과거 데이터 재처리, 데이터 품질 개선 등을 위해 활용한다.
  • 방식: 주로 CLI 명령어를 통해 특정 날짜 범위를 지정하여 실행하며 새로운 DAG Run을 생성한다. (자동화 기능 X)

2-2. Backfill이 필요한 운영 시나리오

  • 장애로 인한 데이터 처리 실패 복구
  • 코드 버그 수정 후 잘못된 데이터 정정
  • 소스 데이터 지연으로 인한 불완전한 처리 보완
  • 비즈니스 로직 변경에 따른 과거 데이터 소급 적용

2-3. 운영 환경에서 Backfill이 위험한 이유

  • 리소스 부하: 대량의 태스크가 동시에 실행되어 시스템 자원을 과도하게 사용
  • 중복 처리: 이미 처리된 데이터를 중복으로 처리하여 데이터 왜곡 가능성
  • 의존성 문제: 다른 시스템과의 의존성으로 인해 예상치 못한 부작용 발생
  • 데이터 정합성: 시점 차이로 인한 데이터 불일치가 발생할 수 있으며, 특히 외부 시스템과 연동시 주의 필요
  • 운영 복잡성: 대규모 Backfill 실행 시 모니터링과 관리가 어려워지며, 실행 중 문제 발생 시 대응이 복잡함

3. Catchup vs Backfill (운영 판단)

3-1. Catchup 개념 정리

  • 작동: DAG 활성화 시 start_date부터 현재까지 실행되지 않은 모든 구간을 자동으로 실행한다.
  • 설정: DAG 정의 시 catchup=True/False 파라미터로 제어하며 기본값은 True다.
  • 특징: 스케줄러에 의해 자동으로 처리되므로 새 DAG 배포 시 주의가 필요하다.

3-2. Backfill 개념 정리  

  • 작동: 사용자가 CLI 명령어를 통해 명시적으로 기간을 지정하여 실행한다. (UI에서도 가능함)
  • 명령어: airflow dags backfill [dag_id] -s [시작일] -e [종료일]
  • 특징: 운영자가 개입하여 의도적으로 재처리하며 병렬 처리 수준 등 세부 옵션 조정이 가능하다.

3-3. 언제 어떤 방식으로 사용해야 하는가?

  • 통제: Backfill은 운영자가 직접 제어하고 모니터링하기 용이하다.
  • 리소스: 실행 방식에 따라 시스템 자원 사용량이 급증할 수 있으므로 신중한 접근이 필요하다.
  • 위험도: 과거 데이터 재처리는 항상 운영 위험을 수반하므로 신중한 접근 필요
  • 설계: 재처리를 고려한 DAG 설계가 중요하며 멱등성을 보장해야 한다.
  • 자동화: Catchup은 자동화된 과거 간격 채우기 메커니즘으로 동작한다.

4. 실습 과정

4-1. Catchup 활성화 실습

  1. start_date를 과거 날짜로 설정한다. (예: 일주일 전)
  2. DAG 정의에 catchup=True 설정을 추가한다.
  3. DAG 파일을 배포하고 활성화한다.
  4. UI에서 다수의 자동 생성된 DAG Run을 확인한다.
from datetime import datetime
from airflow.decorators import dag, task
from airflow.sdk import get_current_context


@task
def print_info():
    ctx = get_current_context()
    logical_date = ctx["logical_date"]
    run_id = ctx["run_id"]

    print("[print_info] logical_date =", logical_date)
    print("[print_info] run_id      =", run_id)


@dag(
    dag_id="catchup_backfill_demo_2",
    start_date=datetime(2024, 1, 1),   # 과거 날짜
    schedule="@daily",
    catchup=True,                     # Catchup 활성화
    tags=["tutorial", "catchup", "backfill"],
)
def catchup_backfill_demo_2():
    print_info()


dag = catchup_backfill_demo_2()

4-2. CLI 기반 Backfill 실행

docker exec -it airflow-airflow-scheduler-1 \
  airflow backfill create \
    --dag-id catchup_backfill_demo \
    --from-date 2026-02-01 \
    --to-date 2026-02-03
  • 명령어와 날짜 범위(-s, -e)를 지정하여 실행한다.
  • --rerun-failed-tasks: 실패한 태스크만 재실행하는 옵션이다.
  • --reset-dagruns: 기존 DAG Run을 초기화하고 재실행한다.
  • UI의 'Browse' > 'DAG Runs' 메뉴에서 상태와 로그를 모니터링한다.

5. Backfill 시 고려사항 및 문제 해결

5-1. Scheduler와의 관계

  • 리소스 경쟁: Backfill과 일반 스케줄링이 동시에 실행될 경우 시스템 자원을 두고 경쟁할 수 있다.
  • 설정: max_active_runs 설정을 통해 동시에 실행 가능한 DAG run 수를 ㅡ제한하여 리소스 관리
  • 우선순위: priority_weight 설정을 통해 중요 작업이 먼저 처리되도록 조정한다.
  • 모니터링: Backfill 실행 시 시스템 리소스와 실행 상태를 지속적으로 모니터링 하여 문제 조기 발견
  • 독립성: Backfill 프로세스는 스케줄러와 독립적으로 실행되어 일반 스케줄링에 미치는 영향을 최소화한다.

5-2. 중복 데이터 문제와 멱등성(Idempotency)

  • 시나리오: 동일 기간 중복 실행이나 소스 변경 없는 재처리로 데이터가 중복 삽입될 수 있다.
  • Idempotent 설계: 동일한 입력에 대해 항상 동일한 결과를 보장하도록 설계해야 한다. (예: UPSERT 방식, 조건부 처리 로직)
  • 운영 안정성 원칙: 데이터 정합성 검증 단계를 추가하고 롤백 메커니즘을 구현하며 테스트 환경에서 먼저 검증을 한다.
  • 설계 관점: 멱등성을 고려한 태스크 설계는 Backfill의 핵심이며 파이프라인 설계 시점부터 고려되어야 한다.

 

5-3. 실습) 비멱등 DAG 

아래 DAG를 설정한다.

from datetime import datetime
from pathlib import Path

from airflow.decorators import dag, task
from airflow.sdk import get_current_context

DATA_DIR = Path("/opt/airflow/logs")
DATA_DIR.mkdir(parents=True, exist_ok=True)
TARGET_FILE = DATA_DIR / "orders_append.csv"


@task
def append_order():
    ctx = get_current_context()
    logical_date = ctx["logical_date"]
    run_id = ctx["run_id"]

    line = f"{logical_date.date()},{run_id}\n"
    print("[append_order] write:", line.strip())

    with TARGET_FILE.open("a") as f:
        f.write(line)


@dag(
    dag_id="backfill_non_idempotent",
    start_date=datetime(2024, 3, 1),
    schedule="@daily",
    catchup=False,
    tags=["backfill", "non-idempotent"],
)
def backfill_non_idempotent():
    append_order()


dag = backfill_non_idempotent()

위 코드에서 "with TARGET_FILE.open("a") as f:"   부분을 보면 
파일을 열어서 맨 아랫줄에 내용을 추가(a)만 하는거라서 비멱등성

 

그 결과 같은 데이터가 또 생긴것을 볼 수 있다.

 

5-4. 실습) 멱등성 DAG 

from datetime import datetime
from pathlib import Path

from airflow.decorators import dag, task
from airflow.sdk import get_current_context

DATA_DIR = Path("/opt/airflow/logs")
DATA_DIR.mkdir(parents=True, exist_ok=True)

TARGET_FILE = DATA_DIR / "orders_overwrite.csv"


@task
def upsert_order_by_date():
    ctx = get_current_context()
    logical_date = ctx["logical_date"].date()
    run_id = ctx["run_id"]

    new_line = f"{logical_date},{run_id}\n"
    print("[upsert_order_by_date] write:", new_line.strip())

    lines = []

    # 1) 기존 파일 읽기 (있다면)
    if TARGET_FILE.exists():
        with TARGET_FILE.open("r") as f:
            for line in f:
                date_part = line.split(",")[0]
                if date_part != str(logical_date):
                    lines.append(line)

    # 2) 현재 날짜 row 추가
    lines.append(new_line)

    # 3) 전체 파일 다시 쓰기
    with TARGET_FILE.open("w") as f:
        f.writelines(lines)


@dag(
    dag_id="backfill_idempotent_single_file",
    start_date=datetime(2024, 3, 1),
    schedule="@daily",
    catchup=False,
    tags=["backfill", "idempotent"],
)
def backfill_idempotent_single_file():
    upsert_order_by_date()


dag = backfill_idempotent_single_file()

 

2월 8일 데이터가 중복되지 않고 오버라이드 된것을 볼 수 있다.

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/06   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함