티스토리 뷰

들어가며

이전 단계까지 Producer가 Kafka로 이벤트를 보내는 것을 확인했다.

하지만 Kafka는 버퍼일 뿐이라, retension이 지나면 메시지가 사라진다. 

데이터를 가지고 분석을 하려면 Kafka의 이벤트를 어딘가에 영구 저장해야 한다.

 

이번 포스팅은 그 첫 영구 저장 계층인 Bronze 레이어를 만든 과정이다.

Producer → Kafka → 현재 단계: [Bronze] → Silver → Gold

Bronze의 원칙은 단 하나 - raw 그대로 저장.

변환-정제는 다음 계층(silver)의 몫이다.


1. 핵심 결정 ① - 컨슈머를 어디서 돌릴 것인가

처음엔 막연히 AWS Glue Streaming으로 적재하면 될거라고 생각했다.

다만 근본적인 문제에 부딪혔다.

클라우드의 Glue는 내 맥북의 `localhost:9092` Kafka에 접근할 수 없다.

 

지금 Kafka는 로컬 Docker에 있고, Glue는 AWS 클라우드에서 돈다. 

둘이 네트워크로 닿지 않아서 아래 세개의 선택지중에 하나를 골라야했다.

방식 문제
AWS Glue Streaming 로컬 Kafka 접근 불가. Kafka를 클라우드로 옮겨야 함
로컬 MinIO AWS를 안 쓰니 Glue Catalog - Athena 학습을 못함
로컬 Spark Structured Streaming 로컬 Kafka 읽고 S3에만 쓰기 → 선택

 

로컬 Spark가 답이었다.

로컬 Kafka를 읽고, 결과만 AWS S3, Glue Catalog에 쓰면, 

네트워크 문제가 없고, 비용도 최소다.

 

여기서 짚고갈 개념은 Glue Streaming은 Spark Structured Streaming을 관리형으로 실행해주는 서비스일뿐, 대립 기술이 아니다. 

이와 관련돼서는 따로 포스팅을 하였으니 확인 바란다.

 

즉 지금 로컬에서 쓴느 Spark 코드는 나중에 EKS/Glue로 그대로 옮길 수 있다는 의미이다.


1-2. 핵심 결정 ② - Bronze에 어떻게 저장할 것인가

큰 함정이 하나 있었다. 캍은 토픽에 스키마가 다른 두 원천이 섞여 있었다.

ad-clicks 토픽
  ├── dummy의 AdEvent       (28필드: campaign_id, banner_id, bid_price ...)
  └── criteo의 CriteoRawEvent (24필드: campaign, cost, cat1~cat9 ...)

 

이 둘의 타입 걸럼으로 파싱하려 하면 스키마가 충동한다.

그래서 선택한 방식이 `raw payload` 보존이다.

Kafka value(JSON)를 파싱하지 않고 문자열 그대로 저장한다.

 

이게 Bronze=raw 원칙에도 정확히 부합한다.

파싱, 통일은 Silver에서 `source`필드로 분기하면 된다.

 

Bronze 테이블 스키마 (4개 토픽 → 4개 테이블, 모두 통일)

컬럼 의미
value raw JSON 원본 (핵심)
key Kafka 키 (= campaign_id)
topic / kafka_partition / kafka_offset Kafka 메타 (재처리, 중복 추적)
kafka_timestamp 브로커 수신 시각
ingested_at Bronze 적재 시각 (신선도 측정)
dt / hour 파티션 (시간 윈도우 처리 최적화)

2. 구현 - 세 단계의 데이터 흐름

전체를 `docker-compose`로 묶었다.

기술 조합은 Spark 3.5.3 + Iceberg 1.11.0 + Glue Catalog + S3.

로컬 Docker                          AWS (ap-northeast-2)
┌──────────────────────┐           ┌────────────────────────┐
│ kafka                │           │ S3                     │
│ producer (dummy)     │           │  warehouse/ ← Iceberg  │
│ producer-criteo      │           │  checkpoints/ ← offset │
│ spark-bronze ────────┼────읽기──→ │ Glue Catalog: bronze   │
│  (Structured Stream) │────쓰기──→ │                        │
└──────────────────────┘           └────────────────────────┘

1단계 - Kafka 버퍼에 모음

Producer 2개가 같은 4개 토픽을 발행한다. `event_type`별로 토픽이 갈린다.

producer.produce(
    topic=to_topic(event),                 # ad-{event_type}s
    key=str(event.campaign_id).encode(),   # 파티션 키
    value=event.to_json_bytes(),           # JSON 바이트
)

Kafka는 campaign_id 해시로 3개 파티션에 분배하고, 각 메시지에 offset을 부여해 순차 저장한다.

Producer 속도와 무관하게 받는 만큼 쌓아두는 버퍼다.

2단계 - Spark가 제 속도로 가져감

raw = (spark.readStream.format("kafka")
    .option("subscribe", "ad-requests,ad-impressions,ad-clicks,ad-conversions")
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", MAX_OFFSETS)   # 배치당 상한
    .load())

여기서 말하는 "제 속도"가 두 가지로 정해진다.

`trigger(processingTime="60 seconds") → 60초마다 한 번
`maxOffsetsPerTrigger = 15000 → 한 번에 최대 15만건
→ 분당 15만 건 페이스로 소비

Kafka에 7천만 건이 쌓여 있어도 자기가 감당할 만큼만 떼어간다. 한번에 다 읽으면 메모리가 터지니까.

이 부분에 대해서는 실제로 내가 겪은 내용을 뒤에도 적어두었다.

3단계 - S3에 Iceberg로 저장

가공은 최소한 이다. value는 파싱 안하고 문자열로만, 메타/파티션 칼럼만 붙인다.

bronze = raw.select(
    col("value").cast("string"),                     # raw 유지
    col("topic"), col("partition"), col("offset"),
    col("timestamp").alias("kafka_timestamp"),
    current_timestamp().alias("ingested_at"),
    date_format(col("timestamp"), "yyyy-MM-dd").alias("dt"),
    hour(col("timestamp")).alias("hour"),
)

 

쓰기는 `foreachBatch`로 토픽별 분리 후 각 Iceberg 테이블에 append한다.

def write_batch(batch_df, epoch_id):
    batch_df.persist(StorageLevel.MEMORY_AND_DISK)
    for topic, table in TOPIC_TO_TABLE.items():
        batch_df.filter(col("topic") == topic) \
                .writeTo(f"glue.bronze.{table}").append()

 

`glue.bronze.ad_clicks`가 실제로  어디인지는 SparkSession 카탈로그 설정이 결정한다.

.config("spark.sql.catalog.glue.catalog-impl", "...glue.GlueCatalog")  # 메타 → Glue
.config("spark.sql.catalog.glue.warehouse", "s3://.../warehouse")      # 데이터 → S3
.config("spark.sql.catalog.glue.io-impl", "...s3.S3FileIO")            # S3 입출력

append 한 번에 parquet(데이터) + metadata.json(상태) + snap/manifest avro(메타)가 함께 쓰인다.


3. 실행하며 부딪힌 문제 

① `No FileSystem for scheme "s3"` 

체크포인트 저장에서 터졌다. 알고 보니 S3 접근 경로가 2개였다.

테이블 데이터 → Iceberg S3FileIO (s3://, AWS SDK v2) ← 이건 됐음
체크포인트 → Hadoop FileSystem (s3a://, AWS SDK v1) ← 이게 빠짐 

Spark의 체크포인트는 Iceberg와 무관하게 Hadoop FileSystem을 쓴다.

Hadoop은 `s3://`를 모르고 `s3a://`만 안다.

→ `hadoop-aws` jar 추가 + 체크 포인트 경로를 `s3a://`로 설정했다.

SDK 버전이 v1/v2로 달라 credentials 환경변수도 둘 다 설정했다.

② OutOfMemoryError

`startOffsets=earliest`로 첫 배치가 그동안 쌓인 수십만 건을 통째로 읽다 메모리 폭발

→ `maxOffsetsPerTrigger`로 배치 크기 제한 + driver 메모리 2g + `MEMORY_AND_DISK`


4. 검증 - 두 스키마의 공존

S3 Parquet를 내려받아 직접 분석했다.

ad_click 테이블 샘플 (218건)
  criteo: 217건 → value 키: campaign, cost, cat1~cat9 (24필드)
  dummy: 1건 → value 키: campaign_id, banner_id, bid_price (28필드)

스키마가 완전히 다른 두 원천이 raw JSON 그대로 한 테이블에 충돌 없이 공존했다.

raw payload 보존 설계가 정확히 의도대로 동작함을 증명했다.

 

Kafka 파티션 분산도 확인:

`kafka_partition: {0: 78, 1: 65, 2: 75}` → 3파티션 고르게 분산


5. 운영의 현실 - 고민..

여기서부터가 진짜다..

파이프라인은 "돌아가게 만드는 것"보다 "운영중 무슨 일이 생기는지 아는 것"이 더 어려운것 같다.

고민 ① 생산 vs 소비 rate 균형

criteo를 최대 속도로 돌렸더니 16.5M행 x 91이벤트가 쏟아져 Kafka 백로그가 7천만 건 까지 갔다.

이건 "용량 부하"가 아니라 "생산속도> 소비속도" 불균형이었다.

  • Kafka는 7천만 건을 문제없이 버퍼링했다. 제 역할을 했다.
  • 진짜 위험은 소비가 못 따라가면 retention이 미소비 데이터를 삭제 → 결국은 삭제된 데이터는 소실되게 되는 것이다.

해결 - rate 두 레버를 인프라(compose env)로 외부화. (코드 수정 없이 튜닝)

CRITEO_REPLAY_INTERVAL    (생산측 throttle)
MAX_OFFSETS_PER_TRIGGER   (소비측 처리량)

 

생산속도를 늦출 수 있는데 왜 소비측 강화를 선택했냐면

실무에서 생산 속도는 실제 트래픽이라 줄이기 힘들기 때문에 소비를 키우는게 정석인것 같았다.

10만 → 15만으로 올려 소비가 생산을 앞지르게 했다.

 

생산량과 소비량을 측정하면서 한 실수가 있었다.

kafka-get-offsets = latest offset = 누적 생산량(high water mark)
                  ≠ 안 읽은 백로그(lag)

처음에 이 값을 "백로그"로 착각했다.

이 값은 producer가 생산하는 한 소비와 무관하게 항상 증가한다.

올바른 판단은 생산 rate vs 소비 rate 비교, 그리고 단일 토픽이 아닌 4토픽 합산으로 봐야한다.

고민 ② 그럼 동적 소비는 안되나

늘어나는 트래픽에 비례하도록 소비하는 양도 늘리면 되지 않을까 고민했고

`maxOffsetsPerTrigger` 상한을 매번 손으로 바꾸는게 마음에 걸렸다.

  • 상한을 빼면 입력에 비례해 유동적으로 가져간다. 단, 로컬은 일꾼이 단일 컨테이너라 큰 배치 = OOM. 상한이 메모리 안전벨트다.
  • 진짜 유동적 소비는 "배치 크기 조절"이 아니라 "consumer(executor) 수를 트래픽에 맞춰 늘리는 것"이다.
  • → EKS + Kafka lag 기반 오토스케일(KEDA)의 과제.
  • 근데 또 로컬에서 여러개의 executor를 띄우는 작업도 해볼 예정

고민 ③ Kafka 볼륨 유실 - 실제로 데이터를 잃었다..

Docker 재시작 과정에서 Kafka 메시지가 리셋됐다. (4,100만 → 25만)

그런데 체크포인트는 S3에 살아남아 불일치가 발생한것이다. (아래가 실제 그 에러 메시지)

Partition ad-clicks-1's offset was changed from 115239 to 102,
some data may have been missed

체크포인트: "115239 까지 읽었다"
Kafka 현실: "나 102까지밖에 없는데?"
-> Spark 크래시 (failOnDataLoss = true)

 

이게 바로 고민 ①에서 우려한 "미소비 데이터 손실"이 실제로 일어나게 된 부분이다.

Spark가 미처 못 읽은 백로그(~7천만)가 Kafka 볼륨과 함께 영구 손실됐다.

 

단, 이미 Bronze(S3)에 적재된 데이터는 안전했다. Parquet 334개 그대로 남아있었다.

이것이 바로 Bronze를 영구 저장소로 두는 이유 그 자체인것이다!

 

복구(?) 절차:

docker compose stop spark-bronze
aws s3 rm s3://<bucket>/checkpoints/bronze/ --recursive   # 불일치 체크포인트 삭제
docker compose up -d spark-bronze                          # 현재 Kafka earliest부터 재개

재시작 후 Kafka 이벤트는 새 uuid라 기존 Bronze와 중복되지 않는다.

`failOnDataLose`는 true 유지 - 손실을 숨기느니 크래시로 알려주는게 안전하다.


6. 깊이 파고든 것들

6-1) 컨테이너 재시작 시 데이터는 안전한가?

정상 재시작은 손실 0, 중복 0이다. 

아래 세개의 개념들이 멈춤 / 재시작의 틈을 채우기 때문이다.

 

  1. Kafka = 버퍼 (안 읽어도 retention 동안 보존)
  2. 체크포인트 = 위치 (어디까지 읽고 썼나 S3에 기록)
  3. Iceberg = 원자성 (commit 전 죽은 배치는 미 반영 → 재실행, 중복 X)

그런데 exactly-once 가 깨지는 순간

체크포인트가 "1001부터 일거"라는데, 그 1001이 retention 만료로 이미 삭제됐다면?

offset out of range
  ├ failOnDataLoss=true  → 크래시 (손실 + 알림)
  └ failOnDataLoss=false → 스킵 (손실 + 침묵)

핵심 - 손실 자체는 못 막는다. retention 만료로 지워진 순간 그 데이터는 이미 사라진것이다.

`failOnDataLoss`는 손실에 어떻게 반응할지만 정한다. 진짜 방어는 예방뿐이다.

 

그래서 추후 고도화 작업시, 이 failOnDataLoss 가 발생한 경우에 담당 엔지니어에게 알림이 가거나 전화가 가는 로직으로 개발해볼 예정이다.

consumer lag < retention
이 부등식이 깨지는 순간 그 구간은 죽는것이다.

 

이게 고민 ①(rate) - ②(동적소비)가 단순 성능이 아니라 데이터 손실 방지 문제인 근본 이유이다.

6-2. 파티션 dt/hour의 타임존

실제 S3 에 저장된 파티션을 보면 아래와 같다.

`hour=5`는 UTC 05시 = KST 14시다.

Spark가 UTC 기준으로 계산하기 때문이다.

UTC로 파티셔닝하면 KST 자정~오전 9시 데이터가 전날 UTC 파티션에 들어가 "KST 일별 집계"가 두 파티션에 걸친다.

Bronze는 raw라 UTC 그대로 두고, KST 변환은 Silver/Glod에서 결정한다.


7. 정리

로컬 Spark Structured Streaming이 로컬 Kafka 4개 토픽을 읽어,
변환 없이 raw JSON 그대로 Iceberg(Glue Catalog 등록, dt/hour 파티션, s3a 체크포인트)에 적재하는 Bronze 파이프라인을 구성했다.


8. 다음 포스팅 예고

  • 우선 전체 파이프라인 구성을 위해 Silver 레이어를 구성해볼 예정이다.
    • value를 source 기준으로 파싱 → 스키마 통일 → event_id 중복 제거(MERGE INTO). 멱등성 처리
  • S3에 Iceberg로 적재하다 보니 정말 작은 용량의 parquet파일이 계속해서 쌓이는것을 발견했다.(Small File Problem)
    • 이걸 Iceberg Compaction으로 수정하는 과정을 해볼 예정이다.
  • 가능하다면 로컬에서 여러개의 executor를 띄워서 트래픽 조절을 해볼 예정이다.
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/06   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함