티스토리 뷰
DataEngineer(DE)/Airflow 3.0 & DAG 개발 및 최적화
Airflow 3.0과 DAG 개발 및 최적화 (4) - DAG 실행 제어 & Backfill
코딩하는 제리코 2026. 4. 9. 23:520. 강의 목차
1. Schedule / Catchup / Retry: 실행 주기 및 재시도 메커니즘
2. Backfill 개념 & 필요성: 과거 데이터 재처리의 이해
3. Catchup vs Backfill: 실무 운영 판단 기준
4. Catchup 활성화 실습: 과거 미실행 구간 자동 처리
5. CLI 기반 Backfill 실행: 명령어 기반 수동 재처리
6. Backfill 시 중복 데이터 문제: 멱등성(Idempotency) 설계의 중요성
1. Schedule / Catchup / Retry
1-1. cron 표현식 기본 개념
- 개요: Airflow에서 DAG 실행 주기를 정의하는 표준 방식으로, 분, 시, 일, 월, 요일 단위를 사용한다.
- 기본 구조: 분(0-59) 시간(0-23) 일(1-31) 월(1-12) 요일(0-6) 순서다.
- 단축어: @daily(매일 자정), @hourly(매시간 시작) 등 간편한 설정을 지원한다.
- 주의사항: 시간 설정은 UTC 기준이므로 한국 시간(KST) 적용 시 9시간 차이를 고려해야 한다.
1-2. Schedule과 실행 타이밍
- schedule_interval: DAG가 실행되는 주기를 설정한다.
- data interval: DAG가 처리해야 하는 데이터의 시간 구간을 의미한다.
- logical date: 데이터 처리의 기준 날짜로, 실제 실행 시점과 다를 수 있다.
- 실행 시점: 실제 실행은 설정된 schedule_interval이 끝난 이후에 발생한다.
이해하기 힘들어서 예를 들었다.
- Schedule Interval (실행 주기): 얼마나 자주 신문을 발행할지 정하는 주기입니다.
- Data Interval (데이터 구간): 신문 기사를 수집하는 시간입니다. 만약 @daily(매일) 스케줄이라면, 오늘 00:00부터 23:59까지가 데이터 구간입니다.
- Logical Date (논리적 날짜): 신문 1면에 찍힌 '발행 일자'입니다. 실제 배달은 내일 되더라도, 신문지에 적힌 날짜는 '오늘 날짜'인 것과 같습니다.
- 실행 시점: 데이터 구간이 끝난 직후에 실행됩니다. 오늘치 데이터를 다 모아야(오늘이 끝나야) 분석을 시작할 수 있기 때문입니다.
1-3. Retry 메커니즘
Retry는 같은 DAG안에서 실행한다. 따라서 네트워크 문제같은 케이스에 사용되게 된다.
반대로, Source Data 자체가 안들어왔다거나, SQL 로직의 이상, 파티션 키가 이상하거나 하는건 Retry로 해결하는 문제가 아니다.
- retries: 실패 시 재시도 횟수를 지정하는 파라미터
- retry_delay: 재시도 사이의 간격을 설정한다.
- max_retry_delay: 최대 재시도 간격을 제한하는 파라미터
- retry_exponential_backoff: 대기 시간을 점진적으로 증가시켜 시스템 부하를 조절한다.
- execution_timeout: 태스크의 최대 실행 시간을 제한한다.
Retry와 Backfill의 차이
Retry는 실패 복구 메커니즘이며, Backfill은 과거 데이터 재처리 메커니즘
또한 Retry는 Task 단위고 Backfill은 DAG 레벨에서 동작하게 된다.
2. Backfill 개념 & 필요성
2-1. Backfill의 정의와 목적
- 개념: 과거 특정 기간에 대해 DAG를 실행하거나 누락된 데이터를 처리하는 기능이다.
- 목적: 운영 중 발생한 장애로 인한 데이터 복구, 로직 변경에 따른 과거 데이터 재처리, 데이터 품질 개선 등을 위해 활용한다.
- 방식: 주로 CLI 명령어를 통해 특정 날짜 범위를 지정하여 실행하며 새로운 DAG Run을 생성한다. (자동화 기능 X)
2-2. Backfill이 필요한 운영 시나리오
- 장애로 인한 데이터 처리 실패 복구
- 코드 버그 수정 후 잘못된 데이터 정정
- 소스 데이터 지연으로 인한 불완전한 처리 보완
- 비즈니스 로직 변경에 따른 과거 데이터 소급 적용
2-3. 운영 환경에서 Backfill이 위험한 이유
- 리소스 부하: 대량의 태스크가 동시에 실행되어 시스템 자원을 과도하게 사용
- 중복 처리: 이미 처리된 데이터를 중복으로 처리하여 데이터 왜곡 가능성
- 의존성 문제: 다른 시스템과의 의존성으로 인해 예상치 못한 부작용 발생
- 데이터 정합성: 시점 차이로 인한 데이터 불일치가 발생할 수 있으며, 특히 외부 시스템과 연동시 주의 필요
- 운영 복잡성: 대규모 Backfill 실행 시 모니터링과 관리가 어려워지며, 실행 중 문제 발생 시 대응이 복잡함
3. Catchup vs Backfill (운영 판단)
3-1. Catchup 개념 정리
- 작동: DAG 활성화 시 start_date부터 현재까지 실행되지 않은 모든 구간을 자동으로 실행한다.
- 설정: DAG 정의 시 catchup=True/False 파라미터로 제어하며 기본값은 True다.
- 특징: 스케줄러에 의해 자동으로 처리되므로 새 DAG 배포 시 주의가 필요하다.
3-2. Backfill 개념 정리
- 작동: 사용자가 CLI 명령어를 통해 명시적으로 기간을 지정하여 실행한다. (UI에서도 가능함)
- 명령어: airflow dags backfill [dag_id] -s [시작일] -e [종료일]
- 특징: 운영자가 개입하여 의도적으로 재처리하며 병렬 처리 수준 등 세부 옵션 조정이 가능하다.
3-3. 언제 어떤 방식으로 사용해야 하는가?
- 통제: Backfill은 운영자가 직접 제어하고 모니터링하기 용이하다.
- 리소스: 실행 방식에 따라 시스템 자원 사용량이 급증할 수 있으므로 신중한 접근이 필요하다.
- 위험도: 과거 데이터 재처리는 항상 운영 위험을 수반하므로 신중한 접근 필요
- 설계: 재처리를 고려한 DAG 설계가 중요하며 멱등성을 보장해야 한다.
- 자동화: Catchup은 자동화된 과거 간격 채우기 메커니즘으로 동작한다.
4. 실습 과정
4-1. Catchup 활성화 실습
- start_date를 과거 날짜로 설정한다. (예: 일주일 전)
- DAG 정의에 catchup=True 설정을 추가한다.
- DAG 파일을 배포하고 활성화한다.
- UI에서 다수의 자동 생성된 DAG Run을 확인한다.
from datetime import datetime
from airflow.decorators import dag, task
from airflow.sdk import get_current_context
@task
def print_info():
ctx = get_current_context()
logical_date = ctx["logical_date"]
run_id = ctx["run_id"]
print("[print_info] logical_date =", logical_date)
print("[print_info] run_id =", run_id)
@dag(
dag_id="catchup_backfill_demo_2",
start_date=datetime(2024, 1, 1), # 과거 날짜
schedule="@daily",
catchup=True, # Catchup 활성화
tags=["tutorial", "catchup", "backfill"],
)
def catchup_backfill_demo_2():
print_info()
dag = catchup_backfill_demo_2()

4-2. CLI 기반 Backfill 실행
docker exec -it airflow-airflow-scheduler-1 \
airflow backfill create \
--dag-id catchup_backfill_demo \
--from-date 2026-02-01 \
--to-date 2026-02-03
- 명령어와 날짜 범위(-s, -e)를 지정하여 실행한다.
- --rerun-failed-tasks: 실패한 태스크만 재실행하는 옵션이다.
- --reset-dagruns: 기존 DAG Run을 초기화하고 재실행한다.
- UI의 'Browse' > 'DAG Runs' 메뉴에서 상태와 로그를 모니터링한다.
5. Backfill 시 고려사항 및 문제 해결
5-1. Scheduler와의 관계
- 리소스 경쟁: Backfill과 일반 스케줄링이 동시에 실행될 경우 시스템 자원을 두고 경쟁할 수 있다.
- 설정: max_active_runs 설정을 통해 동시에 실행 가능한 DAG run 수를 ㅡ제한하여 리소스 관리
- 우선순위: priority_weight 설정을 통해 중요 작업이 먼저 처리되도록 조정한다.
- 모니터링: Backfill 실행 시 시스템 리소스와 실행 상태를 지속적으로 모니터링 하여 문제 조기 발견
- 독립성: Backfill 프로세스는 스케줄러와 독립적으로 실행되어 일반 스케줄링에 미치는 영향을 최소화한다.
5-2. 중복 데이터 문제와 멱등성(Idempotency)
- 시나리오: 동일 기간 중복 실행이나 소스 변경 없는 재처리로 데이터가 중복 삽입될 수 있다.
- Idempotent 설계: 동일한 입력에 대해 항상 동일한 결과를 보장하도록 설계해야 한다. (예: UPSERT 방식, 조건부 처리 로직)
- 운영 안정성 원칙: 데이터 정합성 검증 단계를 추가하고 롤백 메커니즘을 구현하며 테스트 환경에서 먼저 검증을 한다.
- 설계 관점: 멱등성을 고려한 태스크 설계는 Backfill의 핵심이며 파이프라인 설계 시점부터 고려되어야 한다.
5-3. 실습) 비멱등 DAG
아래 DAG를 설정한다.
from datetime import datetime
from pathlib import Path
from airflow.decorators import dag, task
from airflow.sdk import get_current_context
DATA_DIR = Path("/opt/airflow/logs")
DATA_DIR.mkdir(parents=True, exist_ok=True)
TARGET_FILE = DATA_DIR / "orders_append.csv"
@task
def append_order():
ctx = get_current_context()
logical_date = ctx["logical_date"]
run_id = ctx["run_id"]
line = f"{logical_date.date()},{run_id}\n"
print("[append_order] write:", line.strip())
with TARGET_FILE.open("a") as f:
f.write(line)
@dag(
dag_id="backfill_non_idempotent",
start_date=datetime(2024, 3, 1),
schedule="@daily",
catchup=False,
tags=["backfill", "non-idempotent"],
)
def backfill_non_idempotent():
append_order()
dag = backfill_non_idempotent()
위 코드에서 "with TARGET_FILE.open("a") as f:" 부분을 보면
파일을 열어서 맨 아랫줄에 내용을 추가(a)만 하는거라서 비멱등성
그 결과 같은 데이터가 또 생긴것을 볼 수 있다.

5-4. 실습) 멱등성 DAG
from datetime import datetime
from pathlib import Path
from airflow.decorators import dag, task
from airflow.sdk import get_current_context
DATA_DIR = Path("/opt/airflow/logs")
DATA_DIR.mkdir(parents=True, exist_ok=True)
TARGET_FILE = DATA_DIR / "orders_overwrite.csv"
@task
def upsert_order_by_date():
ctx = get_current_context()
logical_date = ctx["logical_date"].date()
run_id = ctx["run_id"]
new_line = f"{logical_date},{run_id}\n"
print("[upsert_order_by_date] write:", new_line.strip())
lines = []
# 1) 기존 파일 읽기 (있다면)
if TARGET_FILE.exists():
with TARGET_FILE.open("r") as f:
for line in f:
date_part = line.split(",")[0]
if date_part != str(logical_date):
lines.append(line)
# 2) 현재 날짜 row 추가
lines.append(new_line)
# 3) 전체 파일 다시 쓰기
with TARGET_FILE.open("w") as f:
f.writelines(lines)
@dag(
dag_id="backfill_idempotent_single_file",
start_date=datetime(2024, 3, 1),
schedule="@daily",
catchup=False,
tags=["backfill", "idempotent"],
)
def backfill_idempotent_single_file():
upsert_order_by_date()
dag = backfill_idempotent_single_file()
2월 8일 데이터가 중복되지 않고 오버라이드 된것을 볼 수 있다.

'DataEngineer(DE) > Airflow 3.0 & DAG 개발 및 최적화' 카테고리의 다른 글
| Airflow 3.0과 DAG 개발 및 최적화 (6) - DAG 구조화 전략 & Dynamic Task Mapping (0) | 2026.04.11 |
|---|---|
| Airflow 3.0과 DAG 개발 및 최적화 (5) - Airflow 3.0 - XCom & Variable - AWS (0) | 2026.04.10 |
| Airflow 3.0과 DAG 개발 및 최적화 (3) - First DAG-TaskFlow API (0) | 2026.04.09 |
| Airflow 3.0과 DAG 개발 및 최적화 (2) - Airflow 3.0 Architecture & Docker (1) | 2026.04.09 |
| Airflow 3.0과 DAG 개발 및 최적화 (1) - Airflow 3.0 과정 목표 및 로드맵 (0) | 2026.04.08 |
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- DataSet
- spark
- airflow
- Data Pipeline
- catchup
- Consumer DAG
- elasticip
- Glue ETL
- Data Engineerring
- Spark structured streaming
- Glue
- iceberg
- s3
- AWS
- RDD
- Data Dngineering
- Backfill
- lake house
- de
- lakehouse
- Unity Catalog
- 데이터파이프라인
- Prodcuder DAG
- kafka
- Databricks
- Data engineering
- DAG
- docker
- AWS Glue Catalog
- Daynamic Task
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | |
| 7 | 8 | 9 | 10 | 11 | 12 | 13 |
| 14 | 15 | 16 | 17 | 18 | 19 | 20 |
| 21 | 22 | 23 | 24 | 25 | 26 | 27 |
| 28 | 29 | 30 |
글 보관함
