티스토리 뷰

데이터 엔지니어링의 'Hello World'라고 불리는 Word Count(단어 개수 세기) 실습을 통해 Spark 애플리케이션이 어떻게 구성되고 동작하는지 알아보자.

단순히코드를 짜는 것이 아니라, 수만 대의 서버에서 데이터가 어떻게 움직이는지 상상하며 작업하는것이 도움이 된다고 한다.


1. Spark 애플리케이션의 핵심 객체

Spark와 대화하기 위해서는 '통로'가 필요하다. 그 통로 역할을 하는 것이 바로 아래 두 객체다.

 

1. Spark Context 

  • 역할: Spark 클러스터와의 연결을 담당하는 가장 기본적인 객체다. (Initialize Environment)
  • 코드 설명
    • SparkConf()를 통해서 Spark Configuration 을 해준다. (초기 세팅 작업)
    • .setAppName 을 통해 앱 애플리케이션 name을 정의한다.
      • 기타 등등 최적화 옵션이 있지만 추후 최적화 작업을 하면서 알아보도록 하겠다.
    • SparkContext 객체 안에다가 만들어두 spark conf 객체를 넣으면 Spark Context를 만들 수 있다.
  • 설명: "자, 이제 Spark 작업을 시작할 거야!"라고 선언하는 것과 같다.

2. Spark Session 

  • 역할: Spark 2.0부터 도입된 객체로, Spark Context를 포함하면서도 더 사용하기 쉬운 상위 개념(Wrapper)이다.
  • 코드 예시: spark = SparkSession.builder
  • 설명: 요즘 실무에서는 SparkContext보다는 훨씬 직관적이고 기능이 많은 Spark Session을 주로 사용한다.

2. 데이터 읽기와 쓰기 (I/O)

분산 환경에서는 내 컴퓨터에 있는 파일을 읽는 것과는 조금 다른 방식이 필요하다.
  • Read Data: 로컬 파일, S3(AWS), HDFS 등에서 데이터를 가져온다.

  • Write Data: 처리된 결과를 원하는 포맷(Parquet, CSV 등)으로 저장한다. 


3. 실습: Word Count 프로세스 이해하기

단어 개수를 세는 작업이 분산 시스템(Distributed System)에서는 어떻게 일어날까?

  1. 서버 준비 
  2. 파일을 나누어 서버에 분산시켜 전송
  3. 프로그램 빌드
  4. 빌드한 프로그램을 서버에 전송
  5. 서버 별로 실행 결과 기록
  6. 모든 결과를 더해서 최종 결과 추출

  • flatMap: File에 든 문자들을 라인별로 정리
  • map: 라인별로 word 를 세서 각 word별로 카운트 한다
  • Reduce: 카운트 한 값을 Key-Value 값으로 저장

주요 코드에 대한 설명은 소제목 4에서 하겠다.

 

3-1. Spark Contextm, Spark Session 설정하기

실습 환경 구성을 위해서

아래 코드를 통해서 SparkContext 를 생성하려 하였지만, 에러가 발생하였다.

from pyspark import SparkConf, SparkContext

spark_conf = SparkConf().setAppName("spark-context-test")
spark = SparkContext(conf = spark_conf)
SparkConf().getAll()

이유는 databricks notebook에서는 이미 SparkContext가 돌아가고 있기 때문에 만들게 되면 에러가 발생하게 되는것이다.

 

 아래 sc 명령어를 통해 확인해보면 이미 Spark Context가 들어 있는 것을 볼 수 있다.

 

 

Spark Context 와는 별개로 Spark Session은 생성할 수 있는데,

이는 Spark Session은 Spark Context를 새로 생성하는 것이 아닌, 기존의 Spark Context를 감싸는 객체기 때문이다.

 

3-2. Read and Write Data 실습

아래 코드를 통해 read 실습을 하였다.

참고로 미리 Catalog 에 실습용 csv 파일을 생성해두었다.

df = spark.read.csv("/Volumes/workspace_7474649733563388/default/raw/movies.csv")
df.printSchema()

위와 같은 코드를 실행시키면 아래와 같은 헤더가 없는 값이 출력된다.

만약 헤더가 있는 값을 출력하고 싶다면, option을 부여하면 된다. (다양한 옵션이 있음)

df = spark.read.option('header', 'True').csv("/Volumes/workspace_7474649733563388/default/raw/movies.csv")
df.printSchema()

 

데이터가 잘 들어갔는지는 아래 show 함수를 사용하면 된다.

df.show(볼 raw 개수, 말줄임표를 사용할지 여부(True or False))

 

display(df)를 해도 된다. (좀더 보기 좋음)

 

 

아래는 write 실습이다.

import pyspark.sql.functions as F

df = df.where(F.col('movieId') > 200)
df.write.mode('Overwrite').csv('/Volumes/workspace_7474649733563388/default/raw/output.csv')

where 절을 사용해서 조건을 건게 포인트(뒤에서 배울거임)

 


4. 핵심 코드 설명 (중요!)

데이터 엔지니어링의 기초인 MapReduce(맵리듀스) 원리가 담겨 있다.

 

0단계: Spark Context 생성

Spark Context 를 생성해여 spark task를 사용할 수 있으므로

word-count 라는 appName으로 생성했다.

# Initialize Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WordCount Practice").getOrCreate()

sc = spark.sparkContext
print(sc)

1단계: 데이터 준비 (Read)

# 텍스트 파일 읽기
text = ["This is the example text file for the first spark codes.",
    "Word count example can explain most of the important features in spark.",
    "You can try coding in Databricks for your self."]
text_file = sc.parallelize(text)
  • parallelize(text)
    • text 리스트를 잘게 쪼개서 클러스터에 있는 여러 개의 일꾼(Worker)들에게 나눠 주는 역할
    • 결과: text_file은 더 이상 단순한 리스트가 아니라 Spark의 기본 분산 데이터 구조인 RDD가 되게된다.

2단계: 맵핑 (Map)

# 단어 단위로 쪼개기
words = text_file.flatMap(lambda line: line.split(" "))
  • 설명: 각 단어를 (단어, 1) 형태의 튜플로 만든다.
  • 왜 하는가? 나중에 같은 단어끼리 숫자를 더하기 위해서 "이 단어가 1개 발견됨!"이라고 표시를 남기는 과정이다.

왜 map이 아니라 flatMap인가?

  • map을 썼을 때: 입력: ["Hello World", "Hi Spark"]
    • 결과: [["Hello", "World"], ["Hi", "Spark"]] → 중첩된 배열이 된다.
  • flatMap을 썼을 때:
    • 입력: ["Hello World", "Hi Spark"]
    • 결과: ["Hello", "World", "Hi", "Spark"] → 중첩을 풀어서 하나의 평평한(Flat) 리스트로 만든다.
# 실행결과 확인하기
words.take(100)

['This',
 'is',
 'the',
 'example',
 'text',
 'file',
 'for',
 'the',
 'first',
 'spark',
 'codes.',
 'Word',
 'count',
 'example',
 'can',
 'explain',
 'most',
 'of',
 'the',
 'important',
 'features',
 'in',
 'spark.',
 'You',
 'can',
 'try',
 'coding',
 'in',
 'Databricks',
 'for',
 'your',
 'self.']

 

그다음 아래 매핑을 통해서 리스트에 있는 단어별로 1을 매핑해준다. (Key - Value)

word_counts = words.map(lambda word: (word, 1))

# 아래는 매핑 후 word_counts의 결과
[('This', 1),
 ('is', 1),
 ('the', 1),
 ('example', 1),
 ('text', 1),
 ('file', 1),
 ('for', 1),
 ('the', 1),
 ('first', 1),
 ('spark', 1),
 ('codes.', 1),
 ('Word', 1),
 ('count', 1),
 ('example', 1),
 ('can', 1),
 ('explain', 1),
 ('most', 1),
 ('of', 1),
 ('the', 1),
 ('important', 1),
 ('features', 1),
 ('in', 1),
 ('spark.', 1),
 ('You', 1),
 ('can', 1),
 ('try', 1),
 ('coding', 1),
 ('in', 1),
 ('Databricks', 1),
 ('for', 1),
 ('your', 1),
 ('self.', 1)]

보면 알겠지만, 아직 같은 Key 값에 대해서는 처리가 안되어 있다. (예: the가 복수개임)

이 처리는 아래 3단계인 Reduce 단계에서 한다.

3단계: 집계 (Reduce)

word_counts = word_counts.reduceByKey(lambda a, b: a + b)


# 아래는 word_counts 를 찍어본 결과
[('text', 1),
 ('file', 1),
 ('for', 2),
 ('first', 1),
 ('codes.', 1),
 ('Word', 1),
 ('explain', 1),
 ('most', 1),
 ('of', 1),
 ('important', 1),
 ('try', 1),
 ('Databricks', 1),
 ('This', 1),
 ('is', 1),
 ('the', 3),
 ('example', 2),
 ('spark', 1),
 ('count', 1),
 ('can', 2),
 ('features', 1),
 ('in', 2),
 ('spark.', 1),
 ('You', 1),
 ('coding', 1),
 ('your', 1),
 ('self.', 1)]
  • 설명: 키(Key)가 같은 놈들끼리 모아서, 값을 줄여나간다.(Reduce)
  • 핵심 원리: reduceByKey는 분산된 서버들 사이에서 같은 단어들을 한데 모아(Shuffle) 숫자를 합산하는 마법 같은 함수다.
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/06   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함