티스토리 뷰

원천 데이터 구성

원천 데이터는 두 가지다.

소스 역할
Criteo Attribution Dataset HuggingFace 제공, 약 1,650만 건의 클릭 데이터. 실시간처럼 재생한다.
더미 이벤트 생성기 Python 스크립트로 OpenRTB 형식의 이벤트를 합성 생성. 파이프라인의 주요 볼륨 소스.


웹 이벤트는 처음에 후보로 올랐지만 제거했다.
실제 유저가 클릭해야 이벤트가 발생하기 때문에 대용량 파이프라인 검증에 적합하지 않다.


파일 구성

producers/
├── common/
│ ├── init.py # Python 패키지 인식용 (이 파일이 없으면 import 불가)
│ └── schema.py # 공통 이벤트 스키마 (두 Producer가 공유)
├── config.py # 모든 설정값 중앙 관리
├── dummy_generator.py # 합성 이벤트 무한 생성
├── criteo_producer.py # Criteo 실시간 재생
└── requirements.txt

 


config.py — 설정 중앙화

모든 설정값을 한 파일에서 관리한다.
Producer 코드 어디에서도 하드코딩 없이 config.변수명으로 읽는다.

KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
TOPIC_REQUESTS    = "ad-requests"
TOPIC_IMPRESSIONS = "ad-impressions"
TOPIC_CLICKS      = "ad-clicks"
TOPIC_CONVERSIONS = "ad-conversions"
DUMMY_EPS = 100                  # 더미 생성기: 초당 auction 발생 수
CRITEO_REPLAY_INTERVAL = 0.0    # Criteo 재생 속도 (0 = 최대 속도)

 

광고 퍼널 비율

`FILL_RATE = 0.80` # P(impression | request)
`CTR = 0.025` # P(click | impression)
`CVR = 0.035` # P(conversion | click)

 

Criteo 역산값

REQUESTS_PER_CLICK = round(1 / (FILL_RATE * CTR)) # = 50
IMPRESSIONS_PER_CLICK = round(1 / CTR) # = 40

CRITEO_DATASET_NAME = "criteo/criteo-attribution-dataset"

localhost:9092의 근거: 
9092는 Kafka 프로젝트에서 정해둔 기본 포트다. MySQL이 3306, PostgreSQL이 5432이듯 Kafka는 9092가 표준이다.
localhost는 로컬 Docker로 Kafka를 띄울 때의 주소다.
EKS로 전환할 때 이 값 하나만 바꾸면 나머지 코드는 손댈 필요 없다.

 

schema.py — 이벤트 스키마

두 Producer의 우너천 형식이 다르기 때문에 스키마도 두 개로 분리한다.

  dummy_generator criteo_producer
원본 형식 없음 (처음 부터 생성) Criteo CSV 컬럼
Bronze에 저장 OpenRTB 형식 그대로 Criteo 원본 필드 그대로
스키마 클래스 AdEvent CriteoRawEvent

 

banner_id를 예로 들면:
- dummy_generator: banner_id = "10000001_300_1" — 처음부터 만든 값, 파생 아님
- criteo_producer: Criteo에 banner_id 없음 → Bronze에 없음 → Silver에서 f"{campaign}_{cat1}_{cat2}" 로 파생

 

@dataclass
class CriteoRawEvent:
    # Producer 생성 메타 필드
    event_id:   str
    event_type: str   # request | impression | click | conversion
    source:     str   # "criteo"
    auction_id: str
    produced_at: str

    # Criteo 원본 필드 — 변환 없음
    campaign:   int   # Silver에서 campaign_id로 통일
    uid:        str
    cost:       float # CPC 단위. Silver에서 ×1000 → CPM bid_price
    timestamp:  int   # 상대 시간(초). Silver에서 절대 Unix epoch으로 변환
    conversion: int   # 0 | 1
    cat1: int         # Silver에서 site_cat(IAB 코드) 파생
    cat2: int         # Silver에서 banner_id 파생에 사용
    cat3: int
    cat4: int
    cat5: int
    cat6: int
    cat7: int
    cat8: int
    cat9: int

 

Bronze에 저장되는 Criteo 이벤트 형태:

{
  "event_id": "uuid",
  "event_type": "click",
  "source": "criteo",
  "auction_id": "uuid",
  "produced_at": "2026-06-10T05:38:00+00:00",
  "campaign": 999,
  "uid": "12345678",
  "cost": 0.00083,
  "timestamp": 86412,
  "conversion": 0,
  "cat1": 5824233,
  "cat2": 1034,
  "cat3": 0
}
banner_id, bid_price, site_cat, device_type, os, country → Bronze에 없음. Silver에서 생성.

 

to_json_bytes()는 왜 있나?
Kafka는 bytes 타입만 받는다. Python 객체를 그대로 넣을 수 없다.

AdEvent 객체 (Python 메모리)
    ↓ asdict(self)       → dict
    ↓ json.dumps(...)    → JSON 문자열 (str)
    ↓ .encode("utf-8")   → bytes  ← Kafka가 요구하는 타입

이 로직을 메서드 하나에 모아두면 두 Producer가 항상 동일한 방식으로 직렬화한다.
나중에 포맷을 바꾸고 싶어도 이 파일 한 곳만 수정하면 된다.

to_topic()는 왜 있나?
이벤트 타입에 따라 어느 Kafka 토픽으로 보낼지 결정하는 라우팅 규칙이다.

 

event.event_type = "click"      → "ad-clicks"
event.event_type = "impression" → "ad-impressions"

두 Producer가 이 함수를 공유하기 때문에 라우팅 규칙이 항상 동일하게 유지된다.

Kafka 파티션 키를 campaign_id로 설정한 이유
파티션 키는 같은 키를 가진 메시지가 항상 같은 파티션으로 가도록 결정하는 값이다.

banner_id는 campaign_id에서 파생된 하위 개념이다.

 

campaign_id: 10000001
  ├── banner: 10000001_300_1  (300×250, 위치 1)
  ├── banner: 10000001_728_3  (728×90, 위치 3)
  └── banner: 10000001_320_1  (320×50, 위치 1)

banner_id로 파티셔닝하면 같은 캠페인의 이벤트가 다른 파티션에 흩어진다.
Spark가 캠페인 단위로 집계할 때 파티션 간 데이터 이동(Shuffle)이 발생한다.

campaign_id로 파티셔닝하면 한 캠페인의 모든 이벤트가 같은 파티션에 모인다.
Gold 테이블의 핵심이 캠페인별 일별 성과이므로 campaign_id가 자연스러운 선택이다.

 

dummy_generator.py — 퍼널 흐름

광고 이벤트는 단계별로 확률적으로 줄어드는 구조다.

request    항상 발생          → 광고 슬롯에 입찰 요청
impression 80% 확률 (fill rate) → 실제로 화면에 표시됨
click      2.5% 확률 (CTR)     → 유저가 클릭
conversion 3.5% 확률 (CVR)     → 구매 또는 전환 완료

코드에서 이 구조는 중첩 if로 표현된다.
if 안에 if가 있는 이유는 상위 이벤트가 발생해야 하위 이벤트가 가능하기 때문이다.
impression 없이 click 불가, click 없이 conversion 불가.

 

while True:
    auction_id = str(uuid.uuid4())  # 이번 경매 묶음 ID

    # request: 항상 발행
    req = _make_event("request", auction_id, ...)
    _emit(producer, req)

    # impression: 80% 확률
    if random.random() < config.FILL_RATE:
        imp = _make_event("impression", auction_id, ...)
        _emit(producer, imp)

        # click: 2.5% 확률 (impression 발생한 경우에만)
        if random.random() < config.CTR:
            clk = _make_event("click", auction_id, ...)
            _emit(producer, clk)

            # conversion: 3.5% 확률 (click 발생한 경우에만)
            if random.random() < config.CVR:
                conv = _make_event("conversion", auction_id, ...)
                _emit(producer, conv)

    if producer:
        producer.poll(0)  # Kafka 비동기 전송 큐 처리

    time.sleep(1.0 / config.DUMMY_EPS)  # 초당 100 auction

 

100번 루프 기준 발생량:

request      100건  (100 × 100%)
impression    80건  (100 × 80%)
click          2건  (80 × 2.5%)
conversion    ~0건  (2 × 3.5%)

 

auction_id를 같은 루프 안에서 생성하고 모든 이벤트에 공유하기 때문에
나중에 Silver에서 "이 클릭이 어떤 impression에서 왔는가"를 auction_id로 조인할 수 있다.

criteo_producer.py — Criteo 데이터 매핑

Criteo 데이터셋은 click 이벤트만 존재하고, 원본 컬럼이 OpenRTB 필드와 다르다.

# Criteo 원본 컬럼 (주요 항목)
timestamp            # 데이터 수집 시작 기준 상대 시간(초)  ← Unix epoch 아님
uid                  # int64 해시값
campaign             # int64 고유 ID → campaign_id로 사용
conversion           # 0 또는 1 → 1이면 conversion 이벤트 생성
cost                 # float64 → bid_price로 변환 필요
cat1 ~ cat9          # 해시 인코딩된 카테고리 정수 (5824233 같은 큰 값)

Kafka 전송 흐름 전체 정리

# 1. Producer 생성
producer = Producer({"bootstrap.servers": config.KAFKA_BOOTSTRAP_SERVERS})

# 2. 메시지 전송 (즉시 전송이 아닌 내부 큐에 적재)
producer.produce(
    topic=to_topic(event),                        # 어느 토픽으로
    key=str(event.campaign_id).encode("utf-8"),   # 파티션 결정 키
    value=event.to_json_bytes(),                  # 메시지 본문 (bytes)
    on_delivery=_delivery_report,                 # 전송 결과 콜백
)

# 3. 루프마다 비동기 큐 처리 (콜백도 이때 실행)
producer.poll(0)

# 4. 종료 시 잔여 메시지 전부 전송
producer.flush()

produce()는 즉시 보내지 않고 내부 버퍼에 쌓는다.
poll(0)이 그 버퍼를 처리하고 on_delivery 콜백을 실행한다.
flush()는 프로세스가 끝나기 전에 버퍼에 남은 메시지를 모두 보내고 대기한다.


실행 방법

cd code/producers
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt

# Kafka 없이 이벤트 구조 검증
DRY_RUN=true python dummy_generator.py
DRY_RUN=true python criteo_producer.py

# Kafka 있을 때 실제 전송
python dummy_generator.py    # 무한 실행, Ctrl+C로 종료
python criteo_producer.py    # 16.5M건 처리 후 자동 종료

 

 

1. dummy_gernerator.py 생성 결과

  • 4개 토픽(ad-requests, ad-impressions 등등) 모두 정상 라우팅 됨.
auction_id: "6f13be3d-c443-44cf-a77f-88779391f383"
  ① ad-requests    → campaign_id: 10000018, source: dummy
  ② ad-impressions → campaign_id: 10000018, auction_id 동일
  ③ ad-clicks      → campaign_id: 10000018, auction_id 동일
  ④ ad-conversions → campaign_id: 10000018, auction_id 동일
auction_id: "d835d6d3-6ec6-4744-805f-cb91aa04e744"
  ① ad-requests    → campaign_id: 10000020
  ② ad-impressions → auction_id 동일
  ③ ad-clicks      → auction_id 동일
  (conversion 없음 — 정상, 3.5% 확률)

impression 없는 request도 여럿 존재 -> fill rate 80% 정상 동작


다음 글

원천 데이터 생성 코드는 완성됐다. 다음은 Kafka 환경 구성이다.

  1. docker-compose.yml 작성 (Kafka KRaft 모드)
  2. 토픽 4개 생성 (ad-requests, ad-impressions, ad-clicks, ad-conversions)
  3. Producer 실제 전송 테스트
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/06   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함