티스토리 뷰

이번에는 Airflow의 데이터 전달 메커니즘과 환경 설정, AWS 연계 방법에 대해 학습한다.

 

0. 목차

1. XCom: Task 간 데이터 전달
2. Anti-pattern: XCom 사용 시 주의점
3. 실습: XCom, Variable 활용
4. AWS 연결: S3/Redshift 연동

 

 


1. Xcom: Task간 데이터 전달

● Xcom(eXchage Communication)
이름 그대로 태스크 간에 데이터를 교환하는 메커니즘

1-1. XCom 사용 기준 & Anti-pattern

  • 기본 개념
    • XCom(eXchange Communication)은 Airflow에서 Task 간 데이터를 전달하는 기본 메커니즘이다.
    • 주로 작은 크기의 데이터(문자열, 숫자, JSON 등)를 전달하는 데 최적화되어 있으며, Airflow의 메타데이터 데이터베이스에 저장된다.
  • 주요 특징
    • Task 간 데이터 공유 가능
    • 메타데이터 DB에 저장
  • 사용 목적
    • Task 실행 결과 공유
    • 상태 정보 전달

1-2. XCom 동작 방식

1. 생성: Task에서 값을 return한다.

2. 저장: 자동으로 XCom에 저장된다(암시적 push).
  메타데이터 DB(SQLite/MySQL/PostgreSQL)에 저장 및 관리되며 필요시 정리가 필요하다.

3. 조회: 다음 Task의 파라미터로 자동 전달된다(자동 xcom_pull).

4. 활용: 조회한 데이터를 기반으로 다음 Task 실행 로직에 활용한다.

 

1-3. XCom 주의점 (Anti-pattern)

  • 금지 사항: 대용량을 XCom으로 전달하면 메타DB에 부하가 생겨 성능 저하를 일으킨다.
    파일, 데이터프레임, ML 모델은 절대 전달하지 마라.
  • 성능 이슈: 동일한 XCom을 반복 저장하면 DB에 불필요한 부하가 발생한다. 필요한 경우에만 저장하고 오래된 XCom은 정리하라.
  • 대안: 임시 파일 저장소 사용, 외부 캐시 활용, 데이터베이스를 중간 저장소로 활용하라.
  • 권장 사례: XCom은 작은 메타데이터만 전달해야 한다. 대용량은 S3 등 외부 저장소에 저장하고 경로만 전달하라.

2. Variable / Connection 설계 전략

2-1. Variable 개념

  • 정의: 환경 설정값을 저장하는 Airflow 기능이며, DAG 코드와 설정값을 분리하여 관리한다.
  • 목적: 코드 내 하드코딩 제거 및 환경별 설정값 분리(개발/운영)를 위해 사용한다.
  • 저장: Airflow 메타데이터 DB에 저장되며 UI 또는 API를 통해 관리 가능하다.
  • 활용: JSON 형태로 복잡한 설정 저장이 가능하며 deserialize_json=True로 객체 변환이 가능하다.

2-2. Variable vs Connection

  • Variable
    • 환경 설정값 저장
    • 개발/운영 환경 분리
    • JSON 형태 지원
    • 단순 키-값 구조
  • Connection
    • 외부 시스템 연결 정보
    • 호스트, 포트, 계정 정보
    • 암호화된 비밀번호 저장
    • Hook 클래스에서 활용

2-3. Variable Best Practice

  • 네이밍: 명확한 네이밍 규칙을 사용한다. (예: s3_bucket_dev, s3_bucket_prod와 같이 환경 구분)
  • JSON: 복잡한 설정은 JSON으로 저장한다. (예: {'bucket': 'my-dev-bucket', 'path': '/raw'})
  • 보안: 민감 정보는 Secret Manager와 연동한다. (AWS Secret Manager, HashiCorp Vault 등 활용)5-3. AWS 인증 구조 (IAM Role

2-4. 기존 인증 방식 (Access Key) vs IAM Role 기반 인증 방식

  • 기존 방식: Access Key ID와 Secret Key를 코드에 직접 입력한다. 키 노출 위험이 있고 주기적 갱신이 필요하여 관리 복잡성이 증가한다.
  • IAM Role 기반 방식: EC2에 IAM Role을 부여하여 키 없이 자동으로 인증한다. boto3가 Instance Metadata를 통해 IAM Role STS를 발급받는다.
  • 결과: 보안성이 향상되고 관리 부담이 감소하며 최소 권한 원칙을 적용할 수 있다.실습 클립 구성
  • 이론 클립: XCom 사용 기준 & Anti-pattern, Variable / Connection 설계 전략, AWS 인증 구조 (IAM Role)
  • 실습 클립: XCom으로 Task 간 데이터 전달, Variable로 환경 분리, S3 / Redshift(또는 Athena) Connection 설정 

3. XCom 관련 실습

실습용 DAG 코드 (아래 천천히 설명 글 작성할 예정이다)

참고로 코드에 사용되고 있는 S3 버킷과 athena와 관련된 설정은 미리 AWS에서 해두었다.

from __future__ import annotations

from datetime import datetime
import json

from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.hooks.athena import AthenaHook


@dag(
    dag_id="clip_xcom_variable_aws",
    start_date=datetime(2025, 1, 1),
    schedule=None,
    catchup=False,
    tags=["clip", "xcom", "variable", "aws"],
)
def clip_xcom_variable_aws():

    @task
    def load_config() -> dict:
        """
        env(dev/prod)에 따라 clip_cfg_dev / clip_cfg_prod Variable(JSON)를 읽는다.
        """
        env = Variable.get("env", default_var="dev").strip()
        var_key = f"clip_cfg_{env}"

        # Variable이 없으면 명확하게 실패시키기 (실습에서 원인 파악 쉬움)
        raw = Variable.get(var_key)  # default_var 제거
        cfg = json.loads(raw)

        # 필수 키 검증 (실습 안정성 ↑)
        required_keys = ["s3_bucket", "s3_prefix", "athena_database", "athena_output"]
        missing = [k for k in required_keys if k not in cfg or not cfg[k]]
        if missing:
            raise ValueError(f"Missing keys in Variable '{var_key}': {missing}")

        cfg["env"] = env
        return cfg

    @task
    def task_a_make_metadata(cfg: dict) -> dict:
        row_count = 1200
        s3_path = f"s3://{cfg['s3_bucket']}/{cfg['s3_prefix']}/sample.csv"

        return {
            "env": cfg["env"],
            "row_count": row_count,
            "s3_path": s3_path,
        }

    @task
    def task_b_consume_metadata(meta: dict) -> str:
        print(f"[{meta['env']}] row_count={meta['row_count']}")
        print(f"result_path={meta['s3_path']}")
        return "SUCCESS"

    @task
    def s3_list_prefix(cfg: dict) -> int:
        hook = S3Hook(aws_conn_id="aws_default")
        keys = hook.list_keys(
            bucket_name=cfg["s3_bucket"],
            prefix=cfg["s3_prefix"]
        ) or []
        print(f"[{cfg['env']}] S3 object count = {len(keys)}")
        return len(keys)

    @task
    def athena_query(cfg: dict) -> str:
        hook = AthenaHook(aws_conn_id="aws_default")
        query = "SELECT 1 AS col"

        execution_id = hook.run_query(
            query=query,
            query_context={"Database": cfg["athena_database"]},
            result_configuration={"OutputLocation": cfg["athena_output"]},
        )
        print(f"[{cfg['env']}] Athena execution_id={execution_id}")
        return execution_id

    cfg = load_config()
    meta = task_a_make_metadata(cfg)
    status = task_b_consume_metadata(meta)
    s3_cnt = s3_list_prefix(cfg)
    athena_id = athena_query(cfg)

    status >> s3_cnt >> athena_id


dag = clip_xcom_variable_aws()

 

1) load_config: 설정값 읽어오는 @task

@task
    def load_config() -> dict:
        # Airflow 웹 화면(UI)에서 "지금 개발용(dev)인지 운영용(prod)인지" 적어둔 메모를 읽어옵니다.
        env = Variable.get("env", default_var="dev").strip()
        var_key = f"clip_cfg_{env}" # env가 dev면 'clip_cfg_dev'라는 메모를 찾습니다.

        # 실제 상세 설정(버킷 이름 등)이 적힌 JSON 텍스트를 읽어서 파이썬 딕셔너리로 바꿉니다.
        raw = Variable.get(var_key)
        cfg = json.loads(raw)
        
        return cfg # 다 읽었으면 이 정보를 'cfg'라는 이름으로 다음 일꾼에게 전달합니다. (XCom 시작!)

 

2) task_a_make_metadata: 주소표 만드는 @task

잘보면 이 태스크가 받는 cfg 인자는 load_config 태스크에서 return한 인자이다.

또한 이 태스크는 마찬가지로 env와 s3_path에 대한 딕셔너리형 데이터를 return 한다.

@task
    def task_a_make_metadata(cfg: dict) -> dict:
        # 앞에서 받은 cfg 정보를 보고, 실제 데이터가 저장될 S3 주소를 문장으로 만듭니다.
        s3_path = f"s3://{cfg['s3_bucket']}/{cfg['s3_prefix']}/sample.csv"

        return {
            "env": cfg["env"],
            "s3_path": s3_path, # 이 주소표를 다시 다음 일꾼에게 전달합니다.
        }

 

3) s3_list_prefix: AWS 저장소 찾는 @task

@task
    def s3_list_prefix(cfg: dict) -> int:
        # 'aws_default'라는 미리 약속된 통로를 통해 S3 리모컨(Hook)을 켭니다.
        hook = S3Hook(aws_conn_id="aws_default")
        # 지정된 S3 폴더 안에 파일이 몇 개 있는지 목록을 가져옵니다.
        keys = hook.list_keys(bucket_name=cfg['s3_bucket'], prefix=cfg['s3_prefix'])
        return len(keys) # 파일 개수를 알려줍니다.

 

4) XCom의 핵심: 실행부(일꾼들 일시키기)

아래와 같은 실행부가 없다면 태스크들이 실행되지 않을것이며,

당연히 태스크가 실행되지 않으면 함수가 돌아 return 도 하지 않기 때문에 XCom에 값들이 저장되지도 않게 된다.

    # 1. 설정을 먼저 읽고 'cfg' 바구니에 담습니다.
    cfg = load_config()
    
    # 2. 'cfg' 바구니를 'task_a'에게 줍니다. task_a는 주소표를 만들어 'meta' 바구니에 담습니다.
    meta = task_a_make_metadata(cfg)
    
    # 3. 'meta' 바구니를 'task_b'에게 줍니다. (이게 바로 XCom 전달!)
    status = task_b_consume_metadata(meta)
    
    # 4. 'cfg' 정보를 보고 S3 파일 개수도 세고, 아테나 쿼리도 날립니다.
    s3_cnt = s3_list_prefix(cfg)
    athena_id = athena_query(cfg)

    # 5. 마지막으로 작업들 사이의 순서를 정해줍니다. (화살표 연결)
    status >> s3_cnt >> athena_id

 

한번더 전체적인 Flow를 설명하자면 아래와 같다.

  • load_config 가 return한 데이터(cfg)는 자동으로 Airflow의 XCom에 저장됨
  • 다음 함수인 task_a_make_metadata에 이 cfg를 넣으면,
    자동으로 Aiflow가 XCom에서 해당 값을 꺼내서 전달해줌

또 그리고 잘 보면 코드에 직접 "my-s3-bucket-name" 와 같이 적지 않고 대신 Variable.get을 썼다.

이렇게 하면 나중에 버킷 이름이 바뀌어도 코드는 수정할 필요 없이 Aiflow 웹 화면에서 값만 바꾸면 된다.


4. Variable 관련 실습

Airflow Web UI 에서 Variable 추가

Airflow Web UI를 보면 수동으로 변수를 추가 할 수 있는데,

이런 방법을 쓰게 되면 위에서 설명했다시피, 나중에 변수의 value가 바뀌어도

코드 수정 없이 웹 화면에서 수정하면 되기 때문에 유지보수가 용이하다.

관리자 > 변수 > 변수 추가

아래는 설정한 변수들의 예시이다.

Key : env
Value : dev
-------------------------

Key : clip_cfg_dev
value:
{
  "s3_bucket": "airflow-clip-dev-maynam1212",
  "s3_prefix": "airflow/dev/clip",
  "athena_database": "default",
  "athena_output": "s3://airflow-clip-dev-maynam1212/athena/results/"
}
-------------------------
Key : clip_cfg_prod
value:
{
  "s3_bucket": "airflow-clip-prod-maynam1212",
  "s3_prefix": "airflow/prod/clip",
  "athena_database": "default",
  "athena_output": "s3://airflow-clip-prod-maynam1212/athena/results/"
}

5. AWS Connection 생성 실습

위 실습 코드를 보면 아래와 같은 코드가 있다. (s3_list_prefix 태스크)

hook = S3Hook(aws_conn_id="aws_default")

 

여기서 Hook을 사용하는 이유는

복잡한 인증 로직을 직접 짜지 않고, Airflow가 미리 만들어둔 AWS 전용 리모콘(Hook)을 가져다 쓰는것이다.

Web UI 를 통한 Connection 연결

Variable 설정과 마찬가지로 Connection도 웹 UI로 할 수 있다. 

커넥션 ID는 위 코드에서 aws_conn_id로 설정한 aws_default 로 해두어야 한다.

커넥션 유형은 우리가 AWS의 S3와 같은 서비스를 가져갈거기 때문에 Amazon Web Services 를 선택해준다.

관리자 > 커넥션 > 커넥션 추가
현재 DAG 내 task들의 구조 (출처: 메타코드)

 

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/06   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함