티스토리 뷰

0. 강의 목차

---이론 부분---
1. DAG 파일 구조 & 기본 규칙
: Airflow 프로젝트 구조와 DAG 파일의 기본 요소
2. TaskFlow API 개념: Python 함수 기반 DAG 작성 방법과 장점
3. Task / DAG Run 관계: 실행 단위와 개념 이해
---실습부분---
4. 첫 TaskFlow DAG 작성: 실습을 통한 TaskFlow API 활용 방법
5. DAG 실행 & Graph View 해석: 실행 결과 확인 및 View 해석
6. 실패 DAG & 로그 분석: Task 로그 확인 포인트

 

● Airflow DAG 하나정도는 스스로 만들고 실패시 트래킹 가능하도록 하는것이 오늘의 목표!


1. DAG 파일 구조 & 기본 규칙

1-1. Airflow 프로젝트 구조와 위치

  • 프로젝트 구조:
    • dags/: DAG 파일이 위치하는 곳이다.
    • plugins/: 커스텀 플러그인을 두는 곳이다.
    • logs/: 실행 로그가 저장되는 곳이다.
  • DAG 파일 위치:
    • dags/ 디렉토리 아래의 .py 파일이다.
    • Airflow가 주기적으로 스캔하며, 파일명은 자유롭게 지정할 수 있다.

1-2. DAG 파일 기본 구성 요소

  1. DAG 객체 정의: from airflow import DAG 후 DAG 인스턴스를 생성하고 ID, 설명,
    start_date, 스케줄 등을 정의한다.
  2. Task 정의: Operator를 통해 Task를 생성하고 작업 단위를 정의한다. 의존성은 >>
    연산자나 TaskFlow API로 설정한다.
  3. 실행 규칙 설정: default_args (공통 기본 설정), start_date (시작점), schedule
    (주기) 등을 지정한다.

1-3. DAG 파일 설정 상세

  • default_args: DAG 내 모든 Task에 공통으로 적용할 기본 설정값(owner, retries,
    retry_delay 등)을 정의하는 딕셔너리다.
  • start_date: DAG가 처음 실행되는 논리적 시작점이다. (실제 실행 시간이 아니며
    과거 날짜 설정도 가능하다)
    • ● 실무팁: start_date는 과거 날짜 하나를 고정해두는게 좋다.
      만약 start_date를 days_ago(0)(오늘)처럼 유동적으로 설정하면, 과거 시점의  데이터를 처리하고 싶어도 Airflow가 해당 시점을 "유효하지 않은 스케줄"로 판단하여 실행을 거부할 수 있다.
  • schedule: DAG 실행 주기를 정의한다.
    cron 표현식, preset(@daily 등)을 사용하거나 None(수동 실행만 가능)으로 설정할 수 있다.

1-4. 핵심 메시지 및 주의사항 (중요)

  • 설계도 역할: DAG 파일은 실행 코드가 아니라 '실행 규칙을 정의하는 설계도'다.
  • 즉시 실행 불가: 파이썬 코드지만 파일 자체를 실행한다고 해서 Task가 즉시
    수행되지 않는다. Airflow가 이 정의를 해석해서 실행한다.
  • 파일 위치가 중요하며 코드를 변경하면 재배포가 필요하다.
  • DAG ID는 고유해야 한다.

2. TaskFlow API 개념

2-1. 도입 배경 및 개념

  • 도입 배경: Airflow 2.0부터 도입된 새로운 방식으로, 기존 Operator 방식보다 더 직관적인 DAG 작성을 지원한다.
  • @dag: 파이썬 함수를 DAG로 변환해주는 데코레이터다.
    DAG의 ID, 설명, 시작일, 스케줄 등을 정의할 수 있다.
  • @task: 파이썬 함수를 Task로 변환해주는 데코레이터다.
    함수의 반환값을 다른 Task에 쉽게 전달할 수 있다.

2-2. 기존 방식과의 차이

기전 Operator 방식과 달리, TaskFlow API는 파이썬 함수를 사용해 DAG를 더 직관적으로 작성할 수 있게 해준다.

데코레이터를 활용해 코드 가독성을 높이고 데이터 의존성을 명확하게 표현할 수 있다.

2-3. 부연 설명

확 와닿지 않는것 같아서 AI를 통한 부연설명을 해보겠다.

 

1. @데코레이터의 직관적인 비유

평범한 사람(함수)이 '소방관 모자(@task)'를 쓰면,
그 사람은 이제 불을 꺼야 하는 '소방관(Airflow Task)'의 역할을 갖게 되는 것과 비슷하다.

  • 그냥 함수: def hello(): → 그냥 파이썬에서 실행되는 코드
  • @task가 붙은 함수: @task def hello(): → Airflow 스케줄러가 관리하고, 로그를 남기고, 실패하면 재시도하는 '관리 대상 작업'

2. 왜 @task와 @dag를 쓸까? (TaskFlow API)

기존 방식(PythonOperator를 일일이 선언하던 방식)과 비교하면 왜 데코레이터가 더 좋은지 알 수 있다.

① 코드의 간결함 (가독성)

  • 기존 방식: PythonOperator라는 객체를 만들고, 함수를 연결하고, ID를 적어주는 등 번거로운 과정이 필요했다.
  • 데코레이터 방식: 함수 위에 @task만 딱 붙이면 끝난다.

② 데이터 전달의 편리함 (가장 핵심!)

기존 방식에서는 A 작업의 결과물을 B 작업으로 넘기려면 XCom이라는 복잡한 통신 시스템을 직접 다뤄야 했다.

하지만 @task를 쓰면 파이썬 함수에서 return한 값을 다음 함수에서 그냥 인자로 받을 수 있습니다.

 

3. 예제 코드로 비교하기

보내주신 실습 파일 내용을 바탕으로 비교해 보겠다.

 

[기존 방식: PythonOperator 사용]

def hello():
    return "Hello World"

task_1 = PythonOperator(
    task_id="hello_task",
    python_callable=hello
)

 

[새로운 방식: TaskFlow API (@task) 사용]

@task
def hello():
    return "Hello World"

# 그냥 함수를 호출하듯이 쓰면 Airflow가 알아서 태스크로 실행합니다.
result = hello() 

 

4. 정리하자면

  • @dag: "이 함수 안에 들어있는 것들은 이제부터 하나의 워크플로우(DAG)야!"라고 선언하는 모자
     
  • @task: "이 함수는 Airflow가 관리하는 하나의 작업(Task)이야!"라고 선언하는 모자

결론적으로 데코레이터는 Task를 '실행'하는 명령어가 아니라,

함수에게 "너는 이제부터 Airflow Task야"라는 '자격'을 부여하는 선언문인거다.


3. Task / DAG Run 관계

3-1. 실행 단위의 이해

이해하기 위한 포인트:
설계도(DAG)는 하나지만, 매일 아침 기계를 돌릴 때마다 새로운 '공정 기록(DAG Run)'과 '결과물(Task Instance)'이 생겨난다.
  • DAG Run: 스케줄 또는 수동 트리거에 의해 생성되는 실행 인스턴스다.
    하나의 DAG 정의에서 여러 개의 DAG Run이 생성될 수 있다.
  • Task Instance: DAG Run 내에서 각 Task의 실제 실행 인스턴스다.
  • Execution Date: DAG Run이 실행된 시점의 기준 시간이다. 과거/미래 데이터 처리 시 매우 중요한 개념이다.
    • "이 데이터는 몇 시 데이터를 처리하는 것인가?"에 대한 이름표
    • 왜 중요할까? 오늘이 2026년 4월 9일이라도, 어제 고장 나서 밀린 4월 8일 데이터를 처리할 수 있다.
      이때 실행은 오늘 하지만, 이름표(Execution Date)는 어제 날짜를 달고 돌아간다.
      그래야 데이터가 섞이지 않기 때문

3-2. 흔한 오해 정리

  • Task ≠ 함수 실행: Task는 실제 함수 자체가 아니라 함수 실행을 위한 '선언'이다.
    실제 함수 코드는 Task가 실행될 때 호출된다.
  • Task Instance: 특정 DAG Run에서 실행되는 Task의 인스턴스로, 고유한 실행
    컨텍스트와 상태를 가진다.
  • 실행 흐름: DAG는 Task 간의 의존성을 정의하지만, 실제 실행은 DAG Run과 Task
    Instance 단위로 이루어진다. 이 구분을 명확히 이해해야 한다.
1.  DAG: "매일 아침 9시에 붕어빵(Task)을 구워라" (설계)
2. DAG Run: "2026년 4월 9일 아침 9시가 되었으니 기계를 돌리자!" (사건 발생)
3. Task Instance: "4월 9일자 붕어빵 작업 시작!" (실제 작업 실행)

4. 첫 TaskFlow DAG 작성

4-1. 작성 및 의존성 설정

  • @dag 데코레이터를 사용하여 스케줄과 시작일을 정의한다.
  • @task 데코레이터를 사용하여 일반 파이썬 함수를 Airflow Task로 변환한다.
    이 과정에서 함수의 입출력은 자동으로 XCom을 통해 처리되어 데이터 흐름이 간결해진다.
  • 의존성 설정: TaskFlow API를 사용하면 파이썬 함수의 호출 구조를 통해 자연스럽게 Task 간 의존성을 설정할 수 있다.
    기존 >> 연산자보다 더 직관적인 코드 흐름을 제공한다.
    • 예: result = process(extract()) -> 코드가 extract -> process 순서로
      실행됨을 보장한다.
# 이번 실습에서 사용한 first_taskflow_dag DAG 코드

from datetime import datetime
from airflow.decorators import dag,task


@dag(
  dag_id="first_taskflow_dag",
  start_date=datetime(2023,1,1),
  schedule="@daily",
  catchup=False,
  tags=["tutorial", "taskflow"],
)
def first_taskflow_dag():
  # 여기 안에 Task들을 연결할 예정
  pass

dag = first_taskflow_dag()

4-2. 코드와 Graph View 연결

  • TaskFlow 코드가 Airflow UI의 Graph View에 화살표로 표시되어 데이터 흐름과 실행 순서를 명확하게 보여준다.
  • 의존성은 코드 순서가 아니라 'DAG 설계'로 표현된다. (코드 작성 순서와 실제 실행 순서는 별개다)

4-3. 파이프라인 구성 예시 (Task 1~4)

  1. 데이터 추출 (Extract): 외부 소스에서 원시 데이터를 가져와 준비한다.
  2. 데이터 변환 (Transform): 비즈니스 요구사항에 맞게 데이터를 가공하고 정제한다.
  3. 결과 저장 (Load): 변환된 데이터를 지정된 저장소에 안전하게 기록한다.
  4. 결과 검증: 처리된 결과의 정확성과 완전성을 검증한다.

아래가 위 4개의 Task를 정리한 DAG 코드 파일이다.

from datetime import datetime
from airflow.decorators import dag,task


@dag(
  dag_id="first_taskflow_dag",
  start_date=datetime(2023,1,1),
  schedule="@daily",
  catchup=False,
  tags=["tutorial", "taskflow"],
)
def first_taskflow_dag():
  raw = extract()
  transformed = transform(raw)
  load(transformed)

@task
def extract():
    print("[extract] 임시 데이터를 생성합니다a.")
    data = [
        {"id": 1, "value": 10},
        {"id": 2, "value": 20},
        {"id": 3, "value": 30},
    ]
    return data
@task
def transform(rows: list[dict]):
    print("[transform] value >= 15 데이터만 필터링합니다.")
    filtered = [row for row in rows if row["value"] >= 15]
    return filtered
@task
def load(rows: list[dict]):
    print("[load] 최종 레코드 개수:", len(rows))
    for row in rows:
        print("[load] row:", row)

dag = first_taskflow_dag()

 

트리거 후 Graph View


5. DAG 실행 & Graph View 해석

5-1. 실행 및 뷰 확인

  • Manual Trigger 실행: Airflow UI에서 DAG를 선택한 후 'Trigger DAG' 버튼을 클릭하여 수동으로 실행할 수 있다.
  • Graph View: Task 간의 의존성을 시각적으로 보여주며 실행 흐름을 한눈에 파악할 수 있다.
  • 실행 상태: 각 Task의 실행 상태를 색상으로 구분하여 진행 상황을 모니터링한다.

5-2. 상태 확인 및 해석

  • Grid View: DAG Run들을 시간순으로 정렬하여 보여주며 각 실행의 전체 상태를 파악하기 좋다.
  • Task 상태 색상:
    • 초록색: 성공
    • 빨간색: 실패
    • 연두/파란색: 실행 중
    • 회색: 대기 중
  • 실행 흐름: 화살표 방향으로 Task 간 의존성과 데이터 흐름을 시각적으로 확인한다.


6. 실패 DAG & 로그 분석

6-1. Task 로그 구조 및 분석

  • 로그 구조: Airflow 로그는 시간순으로 기록되며 실행 시작/종료 시간, 파라미터 정보, 표준 출력/에러, 예외 스택 트레이스 등이 포함된다.
    로그는 Task Instance 별로 구분되어 저장된다.
  • 핵심 확인 포인트:
    • 에러 메시지와 발생 위치
    • 실행 시간과 지연 여부
    • 외부 시스템 연결 상태
    • 데이터 처리량 등
    • 특히 예외 발생 직전의 로그가 문제 해결의 핵심이다.

6-2. 실제 실패 및 로그 분석 실습

위에서 작성한 first_taskflow_dag.py DAG 파일에

아래와 같은 의도적인 에러를 발생하는 @task를 추가하였다.

def first_taskflow_dag():
  raw = extract()
  transformed = transform(raw)
  load(transformed)
  broken_task() #마지막 실패 Task 추가
  
  .
  .
  .
  (다른 Task들은 그대로)
  
@task
def broken_task():
    print("[broken_task] 일부러 에러를 발생시킵니다.")
    return 1 / 0  # ZeroDivisionError

 

이렇게 되면 UI화면에서 아래와 같이 실패한 화면이 보이고, 

UI에서 에러 로그도 확인할 수 있다.

 

아래 명령어를 통해 Vscode 에서도 스캐쥴러 에러 로그를 확인하였다.

docker logs -f --since 5m --timestamps 30-airflow-airflow-scheduler-1 2>&1 \                                                                   
| egrep -i "first_taskflow_dag.*(queued|scheduled|TaskInstance|ti=)"

 

 

● 왜 스케줄러 에러를 봤냐?
스케줄러는 실제 작업을 실행 대기열에 넣고 상태를 관리하기 때문에
Task가 실행되다가 실패하면 상태 변화(Running -> Failed)를 스케줄러가 감지하고 기록하기 때문이다.

egrep의 역할:수많은 로그 중 first_taskflow_dag 와 관련된 TaskInstance 의 상태 변화만 골라내기 때문에 실패한 흐름을 추적하기 좋다.

 

● 엥 그러면 dag-processor에서도 로그를 확인할 수 있는거 아닌가?
명령어: docker logs --tail 200 30-airflow-airflow-dag-processor-1
자연스럽게 위와 같은 의문이 들게 되었는데,
dag-processor는 Task가 몇개인지, 순서는 어떻게 되는지를 담고 있는 설계도만 파싱하는 역할이라 문법에러만 잡게 된다.
다만 내가 이번에 추가한 Task는 문법적 오류라기보단 실행(Execution)중에 데이터나 로직 때문에 실패한거라 해당 로그에는 안잡히게 된다.

 

로그결과는 아래와 같다.

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/06   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함