티스토리 뷰
Spark (4) - Spark의 데이터 종류 및 처리법 - DataFrame & Dataset 편
코딩하는 제리코 2026. 4. 16. 00:201. DataFrame
Distributed collection of data organized into named columns
데이터에 인터페이스가 생겼다는 의미
- RDD: ['Kim', 28], ['Lee', 32] 이걸 Spark는 이름인지 나이인지 모른다. (TS 기준으로 any[])
- DataFrame: name: 'Kim', age: 28 처럼 컬럼 이름과 타입이 정해짐 (TS 기준으로 Uset[])
Spark 데이터 처리 속도 증가를 위해 등장 (텅스텐 프로젝트)
- schema 존재
- 쿼리 최적화 (catalyst optimizer)
- 코드를 비효율적으로 짜도(예: 전체를 다 가져와서 필터링),
Spark 엔진이 "아, 이 코드는 먼저 필터링하고 필요한 컬럼만 가져오는 게 빠르겠네?"라고 실행 계획을 스스로 수정한다.
- 코드를 비효율적으로 짜도(예: 전체를 다 가져와서 필터링),

1-1. DataFrame 특성

- productive: compile-time에 type-safety을 제공
- easy to use: high-level functions
- fast and optimized: catalyst code optimizer
1-2. 언제 DataFrame을 사용할까?
- Data requires a structure: infer a schema
- high-level transformation: columnar functions, SQL queries
- type safety: Compile-time type-safety
그렇다면 RDD는 언제 사용할까?
- low-level의 transformation and actions 가 필요한 경우
- unstructured data(비정형) ex) media streams or streams of text
- functional programming 에 익숙한 경우
- don't care about imposing a schema
- don’t need optimization and performance benefits available with DataFrames and Datasets for structured and semi-structured data
1-3. 같은 기능을 각각 RDD 와 DataFrame 으로 구현
- RDD
rdd1 = sc.parallelize([“1;abcd”, “2;efgh”, “3;ijkl”])
rdd2 = rdd1.map(lambda v: v.split(";")) -> [[“1”, “abcd”], [“2”, “efgh”], ..]
rdd3 = rdd2.map(lambda v: v[0]) -> [“1”, “2”, “3”]
rdd3.collect()
- DataFrame
df1 = createDataFrame([“1;abcd”, “2;efgh”, “3;ijkl”], [“value”])
df2 = df1.select(split("value", "[;]").alias("arr")) arr| [“1”, “abcd”], …
df3 = df2.select("arr[0]")
df3.show(3, false)
- createDataFrame 코드
- 세 개의 문자열 데이터를 가진 리스트를 만들고, 컬럼 이름은 "value"로 지정한다.
RDD와 마찬가지로 다양한 API들은 아래 링크에서 확인할 수 있다.
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html
DataFrame — PySpark 4.1.1 documentation
previous pyspark.sql.MergeIntoWriter.merge
spark.apache.org
2. Dataset
RDD의 장점과 DataFrame의 장점을 합쳐 놓은 데이터 모델
- DataFrame: 데이터가 '표(Table)' 형태인 것은 알지만, 그 안의 데이터가 어떤 타입인지(String인지 Int인지)는 실행해 봐야 아는 상태 (Untyped)
- Dataset: 컴파일 단계에서 이미 데이터의 타입이 확정된 상태 (Typed)
프론트엔드 개발자였던 나에게 익숙한 TS로 비유 해보자면, 아래와 같다.
- DataFrame: any[] 혹은 Object[]. 데이터가 객체인 건 알겠는데, item.name을 썼을 때 이게 문자열인지 오타인지 체크가 안 됨.
- Dataset: interface User { name: string }이 적용된 User[]. 존재하지 않는 프로퍼티를 쓰면 바로 에러를 뱉음.

DataFrame은 Dataset의 부분 집합이고,
안에 있는 Row라는 특정 데이터모델로 이뤄진 데이터 셋이다.
2-1. 왜 Python(PySpark)에는 Dataset 이 없는가?

위 표를 보면 Python과 R은 DataFrame만 사용한다.
- 이유: Python은 태생적으로 동적 타이핑(Dynamic Typing) 언어이기 때문이다.
- 컴파일 타임에 타입을 체크하는 Dataset의 장점을 살릴 수 없어서, PySpark에서는 DataFrame이 Dataset의 역할까지 수행하도록 설계되었다.
- 따라서 Python 개발자라면 "DataFrame의 기능을 잘 쓰는 것이 곧 Dataset의 혜택을 누리는 것"이라고 이해하면 될 것 같다.
2-2. Dataset의 장점
- 타입 안전성 (Type-safety): 코드를 실행하기 전에 타입 에러를 잡아준다. (Scala/Java 한정)
- 직렬화 최적화: 데이터를 메모리에 저장할 때 Spark가 타입을 미리 알고 있으므로, 훨씬 더 작고 효율적인 바이너리 형태로 압축해서 저장한다.
- 높은 추상화: SQL 쿼리처럼 선언적인 코드와 RDD처럼 함수형 프로그래밍 스타일을 동시에 사용할 수 있다.
3. DataFrame 과 Dataset 실습
3-1. Class를 사용해서 데이터를 담는 방식
아래와 같이 Class 를 사용해서 데이터를 담을 수 있는데,
Class를 사용한 방식과 튜플로 묶어서 생성한 방식을 비교해보도록 하겠다.
# Class 로 데이터를 생성한 경우
class Person:
def __init__(self, name, age, job):
self.name = name
self.age = age
self.job = job
p1 = Person("foo", 30, "programmer")
p2 = Person("bar", 10, "student")
# 튜플로 생성한 경우
t1 = ("foo", 30, "programmer")
t2 = ("bar", 10, "student")
# 각각의 경우를 출력해보면
spark.createDataFrame([p1, p2]).show()
spark.createDataFrame([t1, t2]).show()
# 아래와 같이 나온다.
+---+----------+----+
|age| job|name|
+---+----------+----+
| 30|programmer| foo|
| 10| student| bar|
+---+----------+----+
+---+---+----------+
| _1| _2| _3|
+---+---+----------+
|foo| 30|programmer|
|bar| 10| student|
+---+---+----------+
- Class 로 선언한 경우
- 원리: p1 객체 안에 name, age, job이라는 변수(속성) 이름이 있다는 걸 Spark가 스스로 찾아낸다.
- 결과: 변수 이름이 그대로 컬럼 명이 된다.
- 주의점: Spark 내부적으로 컬럼명을 가나다/알파벳 순으로 정렬하는 습성이 있어서, 코드 상의 순서와 상관없이 age, job, name 순으로 출력된 것이다.
- 튜플로 생성한 경우 (Array 방식)
- 원리: 이름 정보가 없으므로, Spark는 "첫 번째 컬럼", "두 번째 컬럼"이라는 의미로 _1, _2, _3 이라는 임시 이름을 붙인다.
- 결과: 데이터의 순서는 유지되지만, 이름표가 없어서 가독성이 떨어진다.
3-2. 계층적 타입구조 설정
StructType 을 import 해서 아래와 같은 계층적 타입 구조를 설정할 수 있다.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("job", StringType(), True)
])
# parallelize 를 통해서 RDD를 만들고 이를 DataFrame으로 바꾸는 작업을 하는 부분
rowRDD = sc.parallelize([Row("foo", 7, "programmer"), Row("bar", 13, "student")])
spark.createDataFrame(rowRDD, schema).show()
# 결과
+----+---+----------+
|name|age| job|
+----+---+----------+
| foo| 7|programmer|
| bar| 13| student|
+----+---+----------+
주석에도 남겨 두었지만,
RDD를 DataFrame으로 바꾸기 위해서는 createDataFrame 함수 안에 RDD 와 지정한 스키마를 넣어주면 된다.
마찬가지로 다양한 함수들이
pyspark.sql.functions 안에 있다. (너무 방대해질것 같아서 따로 포스팅하지는 않겠다.)
내가 직접 함수를 정의하고 싶다면 user define function(UDF) 을 하면 된다. (아래와 같이)
plusone_udf = F.udf(lambda v: v + 1)'DataEngineer(DE) > Spark- 데이터 처리, 최적화' 카테고리의 다른 글
| Spark (4) - Spark의 데이터 종류 및 처리법 - RDD 편 (0) | 2026.04.15 |
|---|---|
| Spark (3) - 첫 번째 Spark 애플리케이션 : Word Count로 이해하는 분산 처리 (1) | 2026.04.14 |
| Spark (2) - Spark 구성 요소 이해하기 - Databricks & Unity Catalog & RDD (1) | 2026.04.14 |
| Spark (1) - 빅데이터와 Spark 의 이해 (1) | 2026.04.13 |
| Spark (0) - 학습 목표 설정 (0) | 2026.04.13 |
- Total
- Today
- Yesterday
- airflow
- Prodcuder DAG
- de
- DAG
- Spark structured streaming
- iceberg
- s3
- 데이터파이프라인
- RDD
- Data Dngineering
- spark
- Data Engineerring
- docker
- AWS Glue Catalog
- Backfill
- lake house
- kafka
- Unity Catalog
- Glue
- lakehouse
- Daynamic Task
- Databricks
- AWS
- elasticip
- Glue ETL
- Data Pipeline
- DataSet
- Consumer DAG
- Data engineering
- catchup
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | |
| 7 | 8 | 9 | 10 | 11 | 12 | 13 |
| 14 | 15 | 16 | 17 | 18 | 19 | 20 |
| 21 | 22 | 23 | 24 | 25 | 26 | 27 |
| 28 | 29 | 30 |
