티스토리 뷰
광고 이벤트 레이크하우스 구축기 (2) — 원천 데이터 생성 코드 구현
코딩하는 제리코 2026. 6. 10. 14:35원천 데이터 구성
원천 데이터는 두 가지다.
| 소스 | 역할 |
| Criteo Attribution Dataset | HuggingFace 제공, 약 1,650만 건의 클릭 데이터. 실시간처럼 재생한다. |
| 더미 이벤트 생성기 | Python 스크립트로 OpenRTB 형식의 이벤트를 합성 생성. 파이프라인의 주요 볼륨 소스. |
웹 이벤트는 처음에 후보로 올랐지만 제거했다.
실제 유저가 클릭해야 이벤트가 발생하기 때문에 대용량 파이프라인 검증에 적합하지 않다.
파일 구성
producers/
├── common/
│ ├── init.py # Python 패키지 인식용 (이 파일이 없으면 import 불가)
│ └── schema.py # 공통 이벤트 스키마 (두 Producer가 공유)
├── config.py # 모든 설정값 중앙 관리
├── dummy_generator.py # 합성 이벤트 무한 생성
├── criteo_producer.py # Criteo 실시간 재생
└── requirements.txt
config.py — 설정 중앙화
모든 설정값을 한 파일에서 관리한다.
Producer 코드 어디에서도 하드코딩 없이 config.변수명으로 읽는다.
KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
TOPIC_REQUESTS = "ad-requests"
TOPIC_IMPRESSIONS = "ad-impressions"
TOPIC_CLICKS = "ad-clicks"
TOPIC_CONVERSIONS = "ad-conversions"
DUMMY_EPS = 100 # 더미 생성기: 초당 auction 발생 수
CRITEO_REPLAY_INTERVAL = 0.0 # Criteo 재생 속도 (0 = 최대 속도)
광고 퍼널 비율
`FILL_RATE = 0.80` # P(impression | request)
`CTR = 0.025` # P(click | impression)
`CVR = 0.035` # P(conversion | click)
Criteo 역산값
REQUESTS_PER_CLICK = round(1 / (FILL_RATE * CTR)) # = 50
IMPRESSIONS_PER_CLICK = round(1 / CTR) # = 40
CRITEO_DATASET_NAME = "criteo/criteo-attribution-dataset"
localhost:9092의 근거:
9092는 Kafka 프로젝트에서 정해둔 기본 포트다. MySQL이 3306, PostgreSQL이 5432이듯 Kafka는 9092가 표준이다.
localhost는 로컬 Docker로 Kafka를 띄울 때의 주소다.
EKS로 전환할 때 이 값 하나만 바꾸면 나머지 코드는 손댈 필요 없다.
schema.py — 이벤트 스키마
두 Producer의 우너천 형식이 다르기 때문에 스키마도 두 개로 분리한다.
| dummy_generator | criteo_producer | |
| 원본 형식 | 없음 (처음 부터 생성) | Criteo CSV 컬럼 |
| Bronze에 저장 | OpenRTB 형식 그대로 | Criteo 원본 필드 그대로 |
| 스키마 클래스 | AdEvent | CriteoRawEvent |
banner_id를 예로 들면:
- dummy_generator: banner_id = "10000001_300_1" — 처음부터 만든 값, 파생 아님
- criteo_producer: Criteo에 banner_id 없음 → Bronze에 없음 → Silver에서 f"{campaign}_{cat1}_{cat2}" 로 파생
@dataclass
class CriteoRawEvent:
# Producer 생성 메타 필드
event_id: str
event_type: str # request | impression | click | conversion
source: str # "criteo"
auction_id: str
produced_at: str
# Criteo 원본 필드 — 변환 없음
campaign: int # Silver에서 campaign_id로 통일
uid: str
cost: float # CPC 단위. Silver에서 ×1000 → CPM bid_price
timestamp: int # 상대 시간(초). Silver에서 절대 Unix epoch으로 변환
conversion: int # 0 | 1
cat1: int # Silver에서 site_cat(IAB 코드) 파생
cat2: int # Silver에서 banner_id 파생에 사용
cat3: int
cat4: int
cat5: int
cat6: int
cat7: int
cat8: int
cat9: int
Bronze에 저장되는 Criteo 이벤트 형태:
{
"event_id": "uuid",
"event_type": "click",
"source": "criteo",
"auction_id": "uuid",
"produced_at": "2026-06-10T05:38:00+00:00",
"campaign": 999,
"uid": "12345678",
"cost": 0.00083,
"timestamp": 86412,
"conversion": 0,
"cat1": 5824233,
"cat2": 1034,
"cat3": 0
}
banner_id, bid_price, site_cat, device_type, os, country → Bronze에 없음. Silver에서 생성.
to_json_bytes()는 왜 있나?
Kafka는 bytes 타입만 받는다. Python 객체를 그대로 넣을 수 없다.
AdEvent 객체 (Python 메모리)
↓ asdict(self) → dict
↓ json.dumps(...) → JSON 문자열 (str)
↓ .encode("utf-8") → bytes ← Kafka가 요구하는 타입
이 로직을 메서드 하나에 모아두면 두 Producer가 항상 동일한 방식으로 직렬화한다.
나중에 포맷을 바꾸고 싶어도 이 파일 한 곳만 수정하면 된다.
to_topic()는 왜 있나?
이벤트 타입에 따라 어느 Kafka 토픽으로 보낼지 결정하는 라우팅 규칙이다.
event.event_type = "click" → "ad-clicks"
event.event_type = "impression" → "ad-impressions"
두 Producer가 이 함수를 공유하기 때문에 라우팅 규칙이 항상 동일하게 유지된다.
Kafka 파티션 키를 campaign_id로 설정한 이유
파티션 키는 같은 키를 가진 메시지가 항상 같은 파티션으로 가도록 결정하는 값이다.
banner_id는 campaign_id에서 파생된 하위 개념이다.
campaign_id: 10000001
├── banner: 10000001_300_1 (300×250, 위치 1)
├── banner: 10000001_728_3 (728×90, 위치 3)
└── banner: 10000001_320_1 (320×50, 위치 1)
banner_id로 파티셔닝하면 같은 캠페인의 이벤트가 다른 파티션에 흩어진다.
Spark가 캠페인 단위로 집계할 때 파티션 간 데이터 이동(Shuffle)이 발생한다.
campaign_id로 파티셔닝하면 한 캠페인의 모든 이벤트가 같은 파티션에 모인다.
Gold 테이블의 핵심이 캠페인별 일별 성과이므로 campaign_id가 자연스러운 선택이다.
dummy_generator.py — 퍼널 흐름
광고 이벤트는 단계별로 확률적으로 줄어드는 구조다.
request 항상 발생 → 광고 슬롯에 입찰 요청
impression 80% 확률 (fill rate) → 실제로 화면에 표시됨
click 2.5% 확률 (CTR) → 유저가 클릭
conversion 3.5% 확률 (CVR) → 구매 또는 전환 완료
코드에서 이 구조는 중첩 if로 표현된다.
if 안에 if가 있는 이유는 상위 이벤트가 발생해야 하위 이벤트가 가능하기 때문이다.
impression 없이 click 불가, click 없이 conversion 불가.
while True:
auction_id = str(uuid.uuid4()) # 이번 경매 묶음 ID
# request: 항상 발행
req = _make_event("request", auction_id, ...)
_emit(producer, req)
# impression: 80% 확률
if random.random() < config.FILL_RATE:
imp = _make_event("impression", auction_id, ...)
_emit(producer, imp)
# click: 2.5% 확률 (impression 발생한 경우에만)
if random.random() < config.CTR:
clk = _make_event("click", auction_id, ...)
_emit(producer, clk)
# conversion: 3.5% 확률 (click 발생한 경우에만)
if random.random() < config.CVR:
conv = _make_event("conversion", auction_id, ...)
_emit(producer, conv)
if producer:
producer.poll(0) # Kafka 비동기 전송 큐 처리
time.sleep(1.0 / config.DUMMY_EPS) # 초당 100 auction
100번 루프 기준 발생량:
request 100건 (100 × 100%)
impression 80건 (100 × 80%)
click 2건 (80 × 2.5%)
conversion ~0건 (2 × 3.5%)
auction_id를 같은 루프 안에서 생성하고 모든 이벤트에 공유하기 때문에
나중에 Silver에서 "이 클릭이 어떤 impression에서 왔는가"를 auction_id로 조인할 수 있다.
criteo_producer.py — Criteo 데이터 매핑
Criteo 데이터셋은 click 이벤트만 존재하고, 원본 컬럼이 OpenRTB 필드와 다르다.
# Criteo 원본 컬럼 (주요 항목)
timestamp # 데이터 수집 시작 기준 상대 시간(초) ← Unix epoch 아님
uid # int64 해시값
campaign # int64 고유 ID → campaign_id로 사용
conversion # 0 또는 1 → 1이면 conversion 이벤트 생성
cost # float64 → bid_price로 변환 필요
cat1 ~ cat9 # 해시 인코딩된 카테고리 정수 (5824233 같은 큰 값)
Kafka 전송 흐름 전체 정리
# 1. Producer 생성
producer = Producer({"bootstrap.servers": config.KAFKA_BOOTSTRAP_SERVERS})
# 2. 메시지 전송 (즉시 전송이 아닌 내부 큐에 적재)
producer.produce(
topic=to_topic(event), # 어느 토픽으로
key=str(event.campaign_id).encode("utf-8"), # 파티션 결정 키
value=event.to_json_bytes(), # 메시지 본문 (bytes)
on_delivery=_delivery_report, # 전송 결과 콜백
)
# 3. 루프마다 비동기 큐 처리 (콜백도 이때 실행)
producer.poll(0)
# 4. 종료 시 잔여 메시지 전부 전송
producer.flush()
produce()는 즉시 보내지 않고 내부 버퍼에 쌓는다.
poll(0)이 그 버퍼를 처리하고 on_delivery 콜백을 실행한다.
flush()는 프로세스가 끝나기 전에 버퍼에 남은 메시지를 모두 보내고 대기한다.
실행 방법
cd code/producers
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
# Kafka 없이 이벤트 구조 검증
DRY_RUN=true python dummy_generator.py
DRY_RUN=true python criteo_producer.py
# Kafka 있을 때 실제 전송
python dummy_generator.py # 무한 실행, Ctrl+C로 종료
python criteo_producer.py # 16.5M건 처리 후 자동 종료
1. dummy_gernerator.py 생성 결과

- 4개 토픽(ad-requests, ad-impressions 등등) 모두 정상 라우팅 됨.
auction_id: "6f13be3d-c443-44cf-a77f-88779391f383"
① ad-requests → campaign_id: 10000018, source: dummy
② ad-impressions → campaign_id: 10000018, auction_id 동일
③ ad-clicks → campaign_id: 10000018, auction_id 동일
④ ad-conversions → campaign_id: 10000018, auction_id 동일
auction_id: "d835d6d3-6ec6-4744-805f-cb91aa04e744"
① ad-requests → campaign_id: 10000020
② ad-impressions → auction_id 동일
③ ad-clicks → auction_id 동일
(conversion 없음 — 정상, 3.5% 확률)
impression 없는 request도 여럿 존재 -> fill rate 80% 정상 동작
다음 글
원천 데이터 생성 코드는 완성됐다. 다음은 Kafka 환경 구성이다.
- docker-compose.yml 작성 (Kafka KRaft 모드)
- 토픽 4개 생성 (ad-requests, ad-impressions, ad-clicks, ad-conversions)
- Producer 실제 전송 테스트
'Projects > 광고 플랫폼 Lakehouse 실전 설계 with Iceberg' 카테고리의 다른 글
| 광고 이벤트 레이크하우스 구축기 (4) — Producer 컨테이너화 와 토픽 자동 생성 (0) | 2026.06.14 |
|---|---|
| [개념] Spark Structured Streaming vs AWS Glue Streaming (0) | 2026.06.11 |
| 광고 이벤트 레이크하우스 구축기 (3) — 로컬 Kafka 환경 구성 및 Producer 연결 테스트 (0) | 2026.06.11 |
| [개념] Iceberg MERGE INTO — 늦게 오는 전환 데이터를 어떻게 처리할까 (0) | 2026.06.11 |
| 광고 이벤트 레이크하우스 구축기 (1) — 프로젝트 설계 및 기술 스택 (0) | 2026.06.10 |
- Total
- Today
- Yesterday
- catchup
- Backfill
- RDD
- Data Pipeline
- Consumer DAG
- Glue ETL
- Data Engineerring
- de
- kafka
- docker
- Data Dngineering
- elasticip
- DataSet
- spark
- airflow
- lake house
- DAG
- Spark structured streaming
- 데이터파이프라인
- Glue
- lakehouse
- Data engineering
- AWS Glue Catalog
- s3
- Daynamic Task
- Unity Catalog
- iceberg
- Databricks
- Prodcuder DAG
- AWS
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | |
| 7 | 8 | 9 | 10 | 11 | 12 | 13 |
| 14 | 15 | 16 | 17 | 18 | 19 | 20 |
| 21 | 22 | 23 | 24 | 25 | 26 | 27 |
| 28 | 29 | 30 |
