티스토리 뷰

들어가며

이전 포스팅에서는 로컬 Kafka를 띄우고 Producer가 메시지를 보내는 것 까지 검증했다.

하지만 그 상태에는 두 가지 한계가 있었다.

 

  1. Producer를 사람이 직접 실행한다 - 터미널에서 `python dummy_generator.py`를 쳐야하고, 터미널을 닫으면 죽는다.
  2. 토픽을 사람이 직접 생성한다 - `./kafka-topics.sh`를 수동으로 실행해야 한다.

실시간으로 데이터가 계속 흘러야 하는 시스템에서 사람이 매번 명령을 친다는 건 운영 관점에서 약점이다.

이번 포스팅에서는 이 둘을 컨테이너로 만들어 `docker compose up`한 번으로 전체 파이프라인 입구가 자동으로 뜨게 만든다.

 

목표 구성:
docker compose up -d
→ Kafka 기동
→ (healthy 대기) → kafka-init 토픽 생성 후 종료
→ (생성 완료 대기) → producer 상시 실행

 

물론 로컬에서 실행하지 않고 컨테이너화한 Producer에서만 이벤트를 발생시킨다면 단순히 해결된다.

다만 코드를 한 줄 고칠때마다 이미지를 다시 빌드하는건 느리고,

호스트 listener를 열어두면 빌드 없이 터미널에서 즉시 테스트 할 수 있기 때문에 아래와 같이 수정하였다.


고민 1 - Producer를 컨테이너로 띄워야 하는가

지금까지는 개발 검증 단계라 쉘에서 직접 실행했다.

하지만 운영 단계에서 실시간으로 데이터를 계속 발생시키려면 Producer가 상시 떠 있어야 한다.

단계별로 보면 이렇다.

로컬 검증 (이전) : python dummy_generator.py ← 터미널 종속

로컬 컨테이너화 docker compose에 producer 추가 ← 이번 단계

EKS 운영 Kubernetes Deployment ← 자동 재시작, 스케일링

컨테이너로 만들면 얻는것

  • 터미널 종속성 제거 - 백그라운드 상시 실행
  • 죽으면 자동 재시작 (`restart: always`)
  • Kafka와 동일한 네트워크에서 관리

참고로 이 프로젝트의 Producer는 실제 광고 시스템을 흉내 내는 시뮬레이터이다.

실무에서는 광고 SDK나 광고 서버가 Producer 역할을 하지만, 여기서는 직접 띄워서 데이터 흐름을 만든다.


고민 2 - 토픽을 Producer 안에서 만들면 안되나?

처음에는 Producer 코드 안에서 토픽도 같이 생성하면 수동 실행 문제가 한 번에 해결되지 않나? 라고 생각했다.

하지만 이건 책임 분리 위반이라 안티패턴으로 본다고 한다.

이유1 - 책임이 섞인다.

토픽 생성 = 인프라/관리 책임 (파티션 수, 복제 인수, 보존 기간 결정)
메시지 전송 = 애플리케이션 책임 (데이터를 보내는 것)

Producer가 '이 토픽은 파티션 3개, 보존 30일'을 결정하는 일종의 월권이다.

토픽설계는 인프라 영역이기 때문이다.

 

이유 2 - 코드가 중복된다.

이 프로젝트의 Producer는 둘(`dummy_generator.py`, `criteo_producer`)이고 같은 토픽을 공유한다.

토픽 생성 로직을 Producer에 넣으면 두 곳에 같은 코드가 중복되고, 토픽 설계를 바꿀 때마다 Producer 코드를 건드려야한다.

 

이유 3 - 권한이 위험해진다.

운영에서 Producer 계정에는 보통 쓰기 권한만 준다.

토픽 생성 권한(`CREATE`)까지 주면 Producer가 실수로 엉뚱한 토픽을 만들 수 있다.

 

해법 - init 컨테이너로 분리

그래서 토픽 생성은 별도의 일회성 초기화 컨테이너(`kafka-init`)가 맡는다.

토픽을 만들고 스스로 종료한다. Producer는 전송만 하는 것이다.

kafka-init → 토픽 생성 후 종료 (인프라 책임)
producer → 메시지 전송만 (애플리케이션 책임)

핵심 난관 - 접속 주소가 두 종류 필요하다

가장 까다로웠던 부분이다.

이전 설정에서 advertised listener는 `localhost:9092` 하나 뿐이었다.

이곤 호스트(맥북)에서 접속할 때만 맞다.

 

Producer를 컨테이너로 옮기면 같은 Docker 네트워크 안에서 `kafka`라는 서비스 이름으로 접속해야한다.

`localhost`는 컨테이너 자기 자신을 가리키므로 통하지 않는다.

호스트 python → localhost:9092
컨테이너 producer → kafka:29092 ← 이 리스너 필요

Kafka의 listener 동작 원리

Kafka 접속은 2단계로 이뤄진다.

1단계 클라이언트 → Kafka에 최초 접속 (부트스트랩)
2단계 Kafka → '실제로 메시지를 보낼 주소는 여기야' (advertised listener 반환)
3단계 클라이언트 → 그 주소로 실제 메시지 전송

 

문제는 2단계에서 돌려주는 주소가 하나 뿐이면, 호스트와 컨테이너 둘 중 하나는 반드시 실패하는 것이다.

  • 호스트에 맞춰 `localhost:9092`를 돌려주면 → 컨테이너가 자기자신의 localhost를 찾아 실패
  • 컨테이너에 맞춰 `kafka:29092`를 돌려주면 → 호스트가 `kafka`라는 이름을 못 찾아 실패

해결 - listener를 2개로 분리

접속 경로가 2개이므로 listener도 2개로 나눈다.

KAFKA_LISTENERS: HOST://:9092,DOCKER://:29092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: HOST:PLAINTEXT,DOCKER:PLAINTEXT,CONTROLLER:PLAINTEXT

여기서 `HOST`, `DOCKER`, `CONTROLLER`는 내가 임의로 붙인 이름표이다. 보안 프로토콜이 아니다.

실제 프로토콜은 `LISTENER_SECURITY_PROTOCOL_MAP`에서 각 이름표에 연결한다.

로컬이라 전부 `PLAINTEXT`(암호화 없음)다.

 

클라이언트는 자기가 접속한 listener에 해당하는 주소를 돌려받으므로, 호스트와 컨테이너 모두 정상 동작한다.

호스트 python → localhost:9092 접속 → localhost:9092 응답 → OK
컨테이너 producer → kafka:29092 접속 → kafka:29092 응답 → OK

구현

config.py - 접속 주소를 환경변수로

컨테이너에서는 `kafka:29092`로, 호스트에서는 `localhost:9092`로 접속해야하므로 하드코딩된 값을 환경변수 오버라이드로 바꾼다.

import os

KAFKA_BOOTSTRAP_SERVERS = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
# 기본값: localhost:9092 - 호스트에서 직접 python 실행했을때
# 컨테이너 실행 시: KAFKA_BOOTSTRAP_SERVERS=kafka:29092 로 오버라이드

 

코드는 그대로 두고, 실행 환경에 따라 환경변수만 바꾸면 된다.

Dockerfile - Producer 이미지

FROM python:3.11-slim

WORKDIR /app

# 의존성 먼저 설치 (레이어 캐시 — 코드만 바뀌면 이 단계 재사용)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 애플리케이션 코드 복사
COPY . .

CMD ["python", "dummy_generator.py"]

requirements.txt를 먼저 복사해 설치하는 건 Docker 레이어 캐시를 활용하기 위해서다. 코드만 바뀌면 의존성 설치 단계를 건너뛴다.

kafka-topics.sh - 호스트와 컨테이너 양쪽 지원

기존 스크립트는 전부 `docker exec kafka ...` 형태였다.

이건 호스트에서만 동작한다. init 컨테이너 안에는 docker가 없으므로 CLI를 직접 호출해야한다.

두 컨텍스트를 모두 지원하도록 파라미터화 한다.

# 실행 컨텍스트 설정 (환경변수로 오버라이드)
BOOTSTRAP="${BOOTSTRAP:-localhost:9092}"
KAFKA_BIN="${KAFKA_BIN:-docker exec kafka /opt/kafka/bin/kafka-topics.sh}"
  • 호스트 실행 → 기본값 그대로 (`docker exec`로 호출)
  • init 컨테이너 → `BOOTSTRAP=kafka:29092`, `KAFKA_BIN=/opt/kafka/bin/kafka-topics.sh`로 직접 호출

이후 스크립트 안의 모든 호출을 `$KAFKA_BIN`으로 통일한다.

 

docker-compose.yaml - 세 컨테이너 연결

핵심은 `depends_on`의 조건이다.

services:
  kafka:
    # ... listener 2개 설정 ...
    healthcheck:
      test: ["CMD-SHELL", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1"]

  kafka-init:
    image: apache/kafka:3.9.0
    depends_on:
      kafka:
        condition: service_healthy          # Kafka가 healthy가 된 뒤 실행
    environment:
      BOOTSTRAP: kafka:29092
      KAFKA_BIN: /opt/kafka/bin/kafka-topics.sh
    volumes:
      - ./kafka-topics.sh:/scripts/kafka-topics.sh:ro
    entrypoint: ["bash", "/scripts/kafka-topics.sh"]
    # 토픽 생성이 끝나면 종료 (일회성)

  producer:
    build:
      context: ../code/producers
    depends_on:
      kafka-init:
        condition: service_completed_successfully  # 토픽 생성 완료 후 시작
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:29092
      PYTHONUNBUFFERED: "1"
    command: ["python", "dummy_generator.py"]
    restart: always

`depends_on`의 두 조건이 실행 순서를 보장한다.

  • `service_healthy` - Kafka가 실제로 요청을 처리할 수 있을 때까지 대기
  • `service_completed_successfully` - kafka-init이 토픽을 다 만들고 정상 종료할 때까지 대기

이 덕분에 `docke compose up`한번으로 순서대로 기동된다.


트러블 슈팅

1. inter.broker.listener.name 오류

listener 이름을 `PLAINTEXT`에서 `HOST/DOCKER`로 바꾸자 Kafka가 기동에 실패했다.

java.lang.IllegalArgumentException: requirement failed:
inter.broker.listener.name must be a listener name defined in advertised.listeners.
The valid options based on currently configured listeners are HOST,DOCKER

 

이전에는 listener 이름이 `PLAINTEXT`라서 kafka가 브로커 간 통신 listener를 자동으로 추론했다.

다만 이름을 바꾸니 추론이 안돼서 이제는 명시해야한다.

KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER

브로커 간 통신은 내부 네트워크 (DOCKER)를 쓰도록 지정했다.

단일 브로커라 실제 브로커간 통신은 없지만 설정은 필수이다.

2. producer 로그가 안보임

producer 컨테이너가 메시지는 정상적으로 보내는데 `docker logs`에 아무것도 안나왔다.

Python의 stdout 버퍼링 때문이다. 

컨테이너 환경에서는 출력이 버퍼에 갇혀 보이지 않는다.

PYTHONUNBUFFERED: "1"

이 환경변수로 버퍼링을 끄면 `print`출력이 즉시 로그로 나온다.


검증

`docker compose up -d` 실행 후 기동 순서가 로그에 그대로 찍힌다.

Container kafka            Healthy
Container infra-kafka-init Started
Container infra-kafka-init Exited      ← 토픽 생성 후 종료
Container infra-producer   Started

 

kafka-init이 토픽 4개를 보존 기간 차등으로 생성했다.

[OK]   ad-requests    생성 완료 (retention=259200000ms)   # 72시간
[OK]   ad-impressions 생성 완료 (retention=259200000ms)   # 72시간
[OK]   ad-clicks      생성 완료 (retention=1209600000ms)  # 14일
[OK]   ad-conversions 생성 완료 (retention=2592000000ms)  # 30일

 

producer는 실시간으로 메시지를 쌓고, 토픽별 메시지 수를 보면 광고 퍼널 비율이 정확히 재현되었다.

requests    7,965
impressions 6,429   (requests의 80.7%  ✓ FILL_RATE 0.80)
clicks        154   (impressions의 2.4% ✓ CTR 0.025)
conversions     7   (clicks의 4.5%      ✓ CVR 0.035)

여전히 남은 고민

1. 단일 브로커에 대한 고민

이 구성은 여전히 단일 브로커다.

"데이터가 100배로 늘면 한대로 버틸 수 있나?" 라는 의문이 들었다.

 

단일 브로커를 쓰면 가용성(브로커가 죽으면 전체 중단되는 문제), 컨슈머 병렬성(파티션 분산), 무중단 운영에 문제가 있다.

 

그리고 그렇다고 로컬에서 브로커를 3대로 늘려도 진짜 가용성은 생기지 않는다.

전부 같은 Docker 호스트(맥북)위에 있으므로 호스트가 죽으면 3대가 동시에 죽는다.

진짜 가용성은 노드와 AZ가 물리적으로 분리되는 EKS 단계에서야 의미가 있는 것이다.

로컬 (개발/검증) 브로커 1대 → 빠르고 단순한 게 미덕

동일한 코드/설정 EKS (운영) 브로커 3대 + 복제 3 + AZ 분산 → 진짜 가용성·병렬성

그래서 로컬은 단일 브로커를 유지하고, 다중 브로커는 EKS 단계의 과제로 남겨두었다.

 

2. 실제 광고 SDK 오류로 인한 멱등성 보장

지금은 producer를 통한 데이터를 생성하고 있지만, 실제 파이프라인에서는

외부 광고 SDK를 통해 데이터를 쌓게 될것이다.

 

만약 이러한 외부 소스에 문제가 있어, 데이터가 쌓이다가 똑같은 데이터가 또 쌓이는 상황이 왔을때 

과연 이 파이프라인에서는 멱등성 문제를 해결 할 수 있을지 의문이 들었다. 

 

다만 고민해봤을때 중복으로 스트리밍 하는건 예외라고 생각하지 않고 정상적으로 언제나 발생할 수 있는 일이라고 생각해보았다.

그렇게 생각하면 "막는다" 기보단 "받아도 걸러낸다"와 같이 접근할 수 있었다.

즉 다른 레이어 (예: Silver)에서 중복을 처리할 수 있을 것 같다.


정리

이번 단계에서 한 것:

  • Producer를 컨테이너로 만들어 상시 실행 + 자동 재시작
  • 토픽 생성을 별도 init 컨테이너로 분리
  • listener 2개로 호스트/컨테이너 양쪽 접속 지원
  • `docker compose up`한 번으로 전체 파이프라인 입구 자동 기동

설계에서 지킨 원칙은 책임 분리이다.

토픽 생성은 인프라(kafka-init)가, 메시지 전송은 애플리케이션(producer)가 맡는다.

이 경계는 EKS로 가도 그대로 유지된다 - init 컨테이너는 Kubernetes Job으로, producer는 Deployment로 바뀔 뿐이다.

 


다음 포스팅 예고

파이프라인 입구가 자동화됐다. 다음은 Bronze 레이어 구축이다.

  • S3 버킷 및 Iceberg 테이블 설계
  • AWS Glue에서 Spark Structured Streaming 으로 Kafka → S3 raw 적재
  • Athena 쿼리로 적재 결과 검증
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/06   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함