티스토리 뷰
DataEngineer(DE)/Airflow 3.0 & DAG 개발 및 최적화
Airflow 3.0과 DAG 개발 및 최적화 (6) - DAG 구조화 전략 & Dynamic Task Mapping
코딩하는 제리코 2026. 4. 11. 01:12Airflow를 활용한 데이터 파이프라인이 복잡해짐에 따라 이를 논리적으로 구조화하고,
데이터 규모에 맞게 동적으로 태스크를 확장하는 기술은 실무에서 매우 중요하다.
이번 포스팅에서는 TaskGroup을 통한 구조화 전략과 Dynamic Task Mapping의 개념을 정리한다.
1. DAG 구조화의 필요성과 TaskGroup 전략
1-1. 왜 DAG를 구조화해야 하는가?
DAG의 규모가 커질수록 관리상의 여러 문제점이 발생한다.
- 가독성 문제: 복잡도가 증가함에 따라 전체 워크플로우를 한눈에 파악하기 어려워진다.
- 유지보수 어려움: 오류 발생 시 원인 파악과 수정이 힘들어지며, 운영 비용이 상승한다.
- 구조적 해결책: TaskGroup을 활용하면 논리적 단위로 Task를 그룹화하여 이러한 문제들을 해결할 수 있다.
1-2. TaskGroup 개념 및 특징
TaskGroup은 Airflow에서 DAG 내 Task들을 논리적으로 묶어주는 핵심 기능이다.
- 주요 특징: 계층적 구조 표현이 가능하며, UI에서 접기/펼치기 기능을 제공한다.
- 활용 분야: ETL 파이프라인의 단계별 구조화 및 반복되는 Task 패턴을 정리하는 데 주로 쓰인다.
- 구현 방식: Python의 with TaskGroup() 구문을 사용하여 정의하며, 그룹 내 Task 간 의존성 설정도 가능하다.
1-3. TaskGroup 도입의 장점
- 운영 효율성: 오류 발생 시 특정 그룹으로 범위를 좁혀 디버깅 시간을 단축하고 문제 지점을 명확히 한다.
- 모듈화: 반복되는 Task 패턴을 공통 로직으로 분리하여 코드 중복을 줄이고 재사용성을 높인다.
- 가독성 향상: Graph View에서 복잡한 DAG를 논리적 단계별(Extract, Transform, Load 등)로 구분하여 파악할 수 있다.
- 협업 효율성: 팀별로 담당 TaskGroup을 분리하여 역할과 책임을 명확히 정의할 수 있다.
1-4. TaskGroup 구성 방식
- with TaskGroup() 구문 사용: Python의 context manager 형태로 그룹 내 Task 정의
- 그룹 내부에 Task 정의: 그룹 내에서 Task간 의존성 설정 가능
- 계층적 구조화: TaskGroup 내에 하위 TaskGroup 중첩 가능
2. ETL 구조화 예시
TaskGroup을 활용하여 ETL(Extract, Transform, Load) 파이프라인을 각 단계별로 그룹화하면 가독성과 유지보수성이 크게 향상된다.
- Extract: 원본 데이터 소스(S3, DB, API 등)에서 필요한 데이터를 식별하고 추출하는 단계다.
- Transform: 추출된 데이터를 정제하고 비즈니스 로직을 적용하며, 데이터 정규화를 통해 분석에 적합한 형태로 변환한다.
- Load: 변환된 데이터를 목적지(S3, Redshift, BigQuery 등)에 적재하고 후속 분석을 위한 준비를 마친다.
아래는 예시 코드이다.
from __future__ import annotations
from datetime import datetime
from airflow.decorators import dag, task
from airflow.utils.task_group import TaskGroup
@dag(
dag_id="clip_taskgroup_dynamic_mapping",
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
tags=["clip", "taskgroup", "dynamic"],
)
def clip_taskgroup_dynamic_mapping():
# ------------------------------------------------------------------
# Extract 단계
# ------------------------------------------------------------------
with TaskGroup(group_id="extract") as extract_group:
@task
def list_files() -> list[str]:
"""
처리 대상 파일 리스트 생성
(실습용으로 고정 리스트 사용)
"""
return [
"file_001.csv",
"file_002.csv",
"file_003.csv",
]
files = list_files()
# ------------------------------------------------------------------
# Transform 단계 (Dynamic Task Mapping)
# ------------------------------------------------------------------
with TaskGroup(group_id="transform") as transform_group:
@task
def process_file(file_name: str) -> str:
"""
파일 단위 처리 Task
"""
print(f"processing {file_name}")
return f"processed_{file_name}"
processed_files = process_file.expand(
file_name=files
)
# ------------------------------------------------------------------
# Load 단계
# ------------------------------------------------------------------
with TaskGroup(group_id="load") as load_group:
@task
def load_result(processed_file: str):
"""
처리 결과 적재
"""
print(f"loading {processed_file}")
load_result.expand(
processed_file=processed_files
)
# ETL 흐름 연결
extract_group >> transform_group >> load_group
dag = clip_taskgroup_dynamic_mapping()

3. Dynamic Task Mapping (동적 태스크 매핑)
3-1. 개념 및 개요
Dynamic Task Mapping은 데이터를 기준으로 Task를 실행 시점에 동적으로 생성하는 기능이다.
- 개념: 실행 시점에 필요한 만큼 Task 인스턴스를 생성한다.
- 특징: 정적인 Task 수를 줄여 DAG를 간소화하고, 코드 변경 없이 작업량에 따라 자동 확장된다.
- 지원: Airflow 2.3 이후 도입된 핵심 기능으로 대부분의 Operator에서 사용 가능하다.
3-2. 동작 방식 및 활용 사례
- 동작 방식: 입력 리스트를 기반으로 .expand() 메서드를 사용하여 매핑 파라미터를 전달하면 Task가 자동으로 생성된다.
- 활용 사례:
- S3 파일 병렬 처리: 버킷 내 파일 목록을 조회한 후 각 파일마다 별도 Task를 생성하여 병렬 처리한다.
- 날짜 기반 백필: 특정 기간의 날짜 리스트를 생성하여 각 날짜별로 개별 Task를 실행한다.
- 고객별 처리: 고객 ID 목록으로 매핑하여 맞춤형 파이프라인을 실행한다.
- 코드의 형태:
@task
def get_files():
# 여기서 파일 목록을 가져옴 (예: ['a.txt', 'b.txt', 'c.txt'])
return ['file1', 'file2', 'file3']
@task
def process_file(file_name):
print(f"Processing {file_name}")
# --- 여기가 핵심 ---
files = get_files()
process_file.expand(file_name=files) # 파일이 3개면 process_file 태스크가 3개로 복제됨!
동작 방식에 대해 개인적으로 이해가 안가서 AI의 도움을 받았다.
Step 1: 리스트 분석 (파이프라인 대기)
.expand()는 인자로 들어온 데이터(여기선 files)가 '반복 가능한(Iterable)' 리스트 형태인지 먼저 확인합니다.
리스트가 ['a', 'b', 'c']라면, Airflow는 속으로 **"오케이, 이 태스크는 3인분짜리네!"**라고 메모해 둡니다.
Step 2: 태스크 복제 (Mapped Instances 생성)
실행 시점이 되면, Airflow는 원래 하나였던 process_file 태스크를 3개의 **'인스턴스(Mapped Instance)'**로 쪼개서 생성합니다.
0번 인스턴스: file_name='a.txt'를 들고 실행
1번 인스턴스: file_name='b.txt'를 들고 실행
2번 인스턴스: file_name='c.txt'를 들고 실행
이때 중요한 건, 이 3개는 서로 다른 컴퓨터(Worker)에서 동시에(병렬로) 돌아갈 수 있다는 점입니다.
100개면 100개가 동시에 도는 거죠.
Step 3: 결과 취합 (Reduce)
모든 복제된 태스크가 끝나면, Airflow는 각각의 결과값들을 다시 하나의 리스트로 예쁘게 묶어서 다음 태스크에게 넘겨줄 준비를 합니다.
3-3. Static 방식의 문제점 vs Dynamic의 해결책
| 구분 | Static 방식 (정적) | Dynamic 방식 (동적) |
| 코드 변경 | 파일 개수 변경 시 매번 코드 수정 필요 | 수정 없이 자동 확장되어 운영 부담 감소 |
| 유지 보수 | 코드 복잡도 증가 및 반복 정의로 가독성 저하 | 코드 간소화 및 유지보수 용이 |
| 병렬 처리 | 동시 실행 Task 수 제한 및 리소스 최적화 난해 | 작업량에 따라 Task 수 조절 및 성능 향상 |
3-4. 리스트를 .expend()로 넘기기 (파일/테이블 처리 - 실무 중심 실습)
아래는 실습할 DAG 코드다.
from __future__ import annotations
from datetime import datetime
from airflow.decorators import dag, task
@dag(
dag_id="dynamic_mapping_records",
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
tags=["tutorial", "dynamic-mapping"],
)
def dynamic_mapping_records():
@task
def list_jobs() -> list[dict]:
# 예: S3 key 목록, 날짜 배치, 고객 세그먼트 등
return [
{"job_id": "A", "path": "s3://bucket/data/dt=2026-02-10/"},
{"job_id": "B", "path": "s3://bucket/data/dt=2026-02-11/"},
{"job_id": "C", "path": "s3://bucket/data/dt=2026-02-12/"},
]
@task
def run_job(job_id: str, path: str) -> str:
print(f"[run_job] job_id={job_id} path={path}")
return f"done:{job_id}"
jobs = list_jobs()
# ✅ dict 키를 인자로 자동 매핑
run_job.expand(
job_id=[j["job_id"] for j in jobs],
path=[j["path"] for j in jobs],
)
dag = dynamic_mapping_records()
위 코드에서 다른부분들은 이해가 가지만, .expand() 부분의 코드를 중심으로 뜯어보았다.
우선 list_jobs 태스크는 {ID, 경로} 쌍으로 만들어진 dict가 3개씩 담긴 list를 반환한다.
그리고 run_job 태스크는 job_id 와 path를 따로따로 받아서 실행한다.
.expand()를 보면 리스트 컴프리헨션을 활용했는데,
이는, 내부적으로 두개의 새로운 리스트를 실시간으로 만들어서 .expand로 넘겨준다는 뜻이다.
- job_id=[j["job_id"] for j in jobs]:
- jobs 바구니를 뒤져서 job_id만 쏙쏙 뽑아냅니다.
- 결과: ['A', 'B', 'C'] 라는 깔끔한 리스트가 됩니다.
- path=[j["path"] for j in jobs]:
- 똑같이 path만 쏙쏙 뽑아냅니다.
- 결과: ['.../10/', '.../11/', '.../12/'] 라는 리스트가 됩니다.
이렇게 되면 이제 .expand에는 아래 두 개의 리스트가 전달된다.
- job_id=['A', 'B', 'C']
- path=['path_A', 'path_B', 'path_C']
여기서 이제 Airflow는 같은 순서끼리 짝짓기를 시작하고 아래와 같이 워커가 일을 하게 된다. (Zip 방식)
- 0번 Worker: job_id='A', path='path_A'
- 1번 Worker: job_id='B', path='path_B'
- 2번 일Worker: job_id='C', path='path_C'
4. DAG 설계 Best Practice & Anti-pattern
4-1. 권장 설계 원칙 (Best Practice)
뭔가 프론트엔드에서 컴포넌트 설계원칙과 상당히 흡사하다.
- 단일 책임: 하나의 Task는 하나의 명확한 역할만 수행해야 하며, 복잡한 로직은 별도 모듈로 분리한다.
- 모듈화: 공통 로직은 함수화하여 재사용하고, Python 파일 구조화를 통해 코드를 관리한다.
- 로깅: airflow.log에 적절한 로깅을 추가하여 오류 발생 시 빠른 원인 파악이 가능하게 한다.
4-2. 피해야 할 패턴 (Anti-pattern)
- 비즈니스 로직 오남용: Task 내부에 너무 복잡한 로직을 포함하면 디버깅과 테스트가 어려워진다.
- 연산자(Operator) 오용: 적합한 전용 Operator 대신 모든 기능을 PythonOperator로만 구현하지 않는다.
- 대기 로직 삽입: 외부 시스템 대기를 위해 Task 내부에 sleep() 루프를 포함하면 불필요한 리소스를 점유하므로 Sensor를 활용한다.
'DataEngineer(DE) > Airflow 3.0 & DAG 개발 및 최적화' 카테고리의 다른 글
| Airflow 3.0과 DAG 개발 및 최적화 (8) - AWS Data Pipeline & Capstone 프로젝트 (0) | 2026.04.13 |
|---|---|
| Airflow 3.0과 DAG 개발 및 최적화 (7) - Dataset 기반 파이프라인 (0) | 2026.04.11 |
| Airflow 3.0과 DAG 개발 및 최적화 (5) - Airflow 3.0 - XCom & Variable - AWS (0) | 2026.04.10 |
| Airflow 3.0과 DAG 개발 및 최적화 (4) - DAG 실행 제어 & Backfill (0) | 2026.04.09 |
| Airflow 3.0과 DAG 개발 및 최적화 (3) - First DAG-TaskFlow API (0) | 2026.04.09 |
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- elasticip
- Spark structured streaming
- Data Engineerring
- Glue ETL
- lake house
- Prodcuder DAG
- spark
- lakehouse
- AWS
- Databricks
- Unity Catalog
- DAG
- Data Pipeline
- Backfill
- docker
- 데이터파이프라인
- kafka
- RDD
- Daynamic Task
- Data engineering
- airflow
- s3
- de
- Data Dngineering
- Glue
- catchup
- DataSet
- Consumer DAG
- AWS Glue Catalog
- iceberg
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | |
| 7 | 8 | 9 | 10 | 11 | 12 | 13 |
| 14 | 15 | 16 | 17 | 18 | 19 | 20 |
| 21 | 22 | 23 | 24 | 25 | 26 | 27 |
| 28 | 29 | 30 |
글 보관함
