티스토리 뷰

1. 프로젝트 소개

지금까지 공부한 AWS Service를 바탕으로 아래와 같은 데이터 파이프라인을 구축해보고자 한다.

1-1. 파이프라인 아키텍쳐

구축할 데이터 파이프라인

보다시피 자동화된 파이프라인은 아니다.

따라서 추후에 Apache 와 Spark를 공부해서 자동화 파이프라인도 구축해볼 예정이다.

1-2. 결과 대시보드

 


2. 부동산 API 확인

국토부에서 제공하는 공공 데이터를 사용한다.

2-1. 공공 데이터 포탈 작업

1. 공공 데이터 포탈(https://www.data.go.kr/)을 방문하여 회원가입후 API Key를 받아야한다.

 

2. "국토교통부_아파트 매매 실거래가 자료"를 검색해서 활용 신청을 한다.

 

3. 활용신청된 탭으로 들어가서 일반 인증키를 별도로 저장해둔다. (절대 외부 노출 X)

 

Postman으로 데이터 요청 해본 예시


3. 데이터 수집 코드 작성

데이터에 대한 별도의 전환 없이 있는 그대로 데이터 레이크인 S3에 저장하는 방식으로 한다.

3-1. 작업 전 S3 버킷 생성

우선 S3에 smj-realestate-test 이라는 버킷을 만들고

그안에 아파트 실거래 값을 그대로 넣는다는 의미로 apt-trade-raw 라는 폴더를 만들었다.

이제 이 폴더 안에 api를 통해 받아온 값을 넣는 작업을 해보려고 한다.

 

3-2. API를 통해서 데이터 가져오기

아래 파이썬 코드를 통해서 데이터를 가져올 수 있다.

기본적인 파이썬 문법만 안다면 이해할 수 있는 코드라 설명은 생략하도록 하겠다.

def get_apr_trade_from_api(service_key:str, lawd_cd:str, deal_ymd:str):

  import requests

  num_of_rows = 50
  page_no =1
  end_point_url = ("https://apis.data.go.kr/1613000/RTMSDataSvcAptTrade/getRTMSDataSvcAptTrade?"
                   f"serviceKey={service_key}&LAWD_DE={lawd_cd}&DEAL_YMD={deal_ymd}"
                   f"&numOfRows={num_of_rows}&pageNo={page_no}")
  
  response = requests.get(end_point_url)
  print(response)

def main():
  from dotenv import load_dotenv
  import os

  # 변수 설정
  load_dotenv()

  service_key = os.getenv("SERVICE_KEY")
  lawd_cd = "11110" # 종로구
  deal_ymd = "202502"

  # 1. API를 통해서 데이터 가져오기
  get_apr_trade_from_api(service_key, lawd_cd, deal_ymd)

if __name__ == "__main__":
  main()

 

3-3. 호출한 데이터 가공하기

호출한 데이터의 형태를 보면 수많은 item객체 안에 데이터들이 담겨져 있는것을 볼 수 있다. (XML형태)

<?xml version="1.0" encoding="utf-8" standalone="yes"?>
<response>
    <header>
        <resultCode>000</resultCode>
        <resultMsg>OK</resultMsg>
    </header>
    <body>
        <items>
            <item>
            데이터들
            </item>
            <item>
            데이터들2
            </item>
            ...생략

 

따라서 이러한 데이터들을 JSON 형태로 가공해서 S3에 저장하도록 하는 작업을 하려고 한다.

  •  get_apt_trade_from_api 를 통해서 가져온 데이터를 가공하는 processing_datas라는 메서드를 만든다.
  • 해당 메서드에서는 list안에 데이터별로 객체로 저장하는 형태로 가공할 예정이다.
    • 결국 return 되는건 list 하나
  • item들을 돌면서 list_temp 안에 dict_item들을 넣으면서 list_temp를 완성한다.
def processing_datas(root):
  list_temp = list()

  # [{거래 #1}, {거래 #2}] 이러한 형태로 만들것
  for item in root.iter("item"):
    dict_item = {
            "sggCd": item.find("sggCd").text,
            "umdNm": item.find("umdNm").text,
            "aptNm": item.find("aptNm").text,
            "jibun": item.find("jibun").text,
            "excluUseAr": item.find("excluUseAr").text,
            "dealYear": item.find("dealYear").text,
            "dealMonth": item.find("dealMonth").text,
            "dealDay": item.find("dealDay").text,
            "dealAmount": item.find("dealAmount").text,
            "floor": item.find("floor").text,
            "buildYear": item.find("buildYear").text,
            "cdealType": item.find("cdealType").text,
            "cdealDay": item.find("cdealDay").text,
            "dealingGbn": item.find("dealingGbn").text,
            "estateAgentSggNm": item.find("estateAgentSggNm").text,
            "rgstDate": item.find("rgstDate").text,
            "aptDong": item.find("aptDong").text,
            "slerGbn": item.find("slerGbn").text,
            "buyerGbn": item.find("buyerGbn").text,
            "landLeaseholdGbn": item.find("landLeaseholdGbn").text
      
    }
    list_temp.append(dict_item)

    return list_temp


def get_apt_trade_from_api(service_key:str, lawd_cd:str, deal_ymd:str):

  import requests
  import xml.etree.ElementTree as ET

  num_of_rows = 50
  page_no =1
  list_result = list()

  end_point_url = ("https://apis.data.go.kr/1613000/RTMSDataSvcAptTrade/getRTMSDataSvcAptTrade?"
                   f"serviceKey={service_key}&LAWD_CD={lawd_cd}&DEAL_YMD={deal_ymd}"
                   f"&numOfRows={num_of_rows}&pageNo={page_no}")
  
  response = requests.get(end_point_url)
  root = ET.fromstring(response.text)

  # 2. 데이터 가공하기
  list_result = processing_datas(root)
  return list_result

 

3-4. 데이터를 파일로 저장하고 S3에 저장하기

파일로 저장하기

이제 가공한 데이터를 별도의 파일로 저장하고 이를 S3에 업로드하려고 한다.

  • 현재 파일상태는 리스트형[{거래 #1}, {거래 #2}]인데 파일에 저장할때는 읽기 편하게 {거래 #1} {거래 #2} 와 같은 형태로 저장해야한다.
  • 따라서 jsonlines 패키지를 통해서 만약 file_type이 json이라면 올바른 형태로 전환하였다. (save_file 메서드)
def save_file(content, file_name, file_type):
  import jsonlines

  # 파일 형태가 현재는 [{거래 #1}, {거래 #2}] 로 되어 있는데
  # 저장할때는 리스트가 아니라 {거래 #1} {거래 #2}로 저장해야함
  if file_type == "json":
    with jsonlines.open(file_name, "w") as f:
      f.write_all(content) 


def main():
  from dotenv import load_dotenv
  import os

  # 변수 설정
  load_dotenv()

  service_key = os.getenv("SERVICE_KEY")
  lawd_cd = "11110" # 종로구
  deal_ymd = "202502"

  # file_name은 파티셔닝에 유리한 구조로 만든다.  
  file_name = "f{deal_ymd}_{lawd_cd}_result.json"
  file_type = "json"

  # 1. API를 통해서 데이터 가져오기
  trade_result = get_apt_trade_from_api(service_key, lawd_cd, deal_ymd)

  # 3. 해당 데이터를 파일로 저장하기
  save_file(trade_result, file_name, file_type)

 

S3에 업로드하기

아래 코드를 통해 S3 버킷에 파일을 업로드할 수 있다.

  • S3에 올릴때 필요한 oject_name 파라미터는 현재 S3 구조와 맞춰주는데
    • 현재 우리는 deal_ymd 와 lawd_cd 를 이용한 이중 파티션 구조기 때문에 정확하게 맞춰줘야한다.
def upload_to_s3(file_name, bucket_name, object_name):
  from botocore.exceptions import NoCredentialsError
  import boto3

  s3_client = boto3.client("s3")

  try:
    s3_client.upload_file(file_name, bucket_name, object_name)
  except NoCredentialsError:
    print("AWS 자격 증명을 찾을 수 없습니다.")

def main():
  from dotenv import load_dotenv
  import os

  # 변수 설정
  load_dotenv()

  service_key = os.getenv("SERVICE_KEY")
  lawd_cd = "11110" # 종로구
  deal_ymd = "202502"

  # file_name은 파티셔닝에 유리한 구조로 만든다.  
  file_name = f"{deal_ymd}_{lawd_cd}_result.json"
  file_type = "json"

  bucket_name = "smj-realestate-test"
  # 이중 파티션으로 설정
  object_name = f"apt-trade-raw/deal_ymd={deal_ymd}/lawd_cd={lawd_cd}/result.json"

  # 1. API를 통해서 데이터 가져오기
  trade_result = get_apt_trade_from_api(service_key, lawd_cd, deal_ymd)

  # 3. 해당 데이터를 파일로 저장하기
  save_file(trade_result, file_name, file_type)

  # 4. S3에 업로드하기
  upload_to_s3(file_name, bucket_name, object_name)

 

최종 코드

def processing_datas(root):
  list_temp = list()

  # [{거래 #1}, {거래 #2}] 이러한 형태로 만들것
  for item in root.iter("item"):
    dict_item = {
            "sggCd": item.find("sggCd").text,
            "umdNm": item.find("umdNm").text,
            "aptNm": item.find("aptNm").text,
            "jibun": item.find("jibun").text,
            "excluUseAr": item.find("excluUseAr").text,
            "dealYear": item.find("dealYear").text,
            "dealMonth": item.find("dealMonth").text,
            "dealDay": item.find("dealDay").text,
            "dealAmount": item.find("dealAmount").text,
            "floor": item.find("floor").text,
            "buildYear": item.find("buildYear").text,
            "cdealType": item.find("cdealType").text,
            "cdealDay": item.find("cdealDay").text,
            "dealingGbn": item.find("dealingGbn").text,
            "estateAgentSggNm": item.find("estateAgentSggNm").text,
            "rgstDate": item.find("rgstDate").text,
            "aptDong": item.find("aptDong").text,
            "slerGbn": item.find("slerGbn").text,
            "buyerGbn": item.find("buyerGbn").text,
            "landLeaseholdGbn": item.find("landLeaseholdGbn").text
      
    }
    list_temp.append(dict_item)

  return list_temp


def get_apt_trade_from_api(service_key:str, lawd_cd:str, deal_ymd:str):

  import requests
  import xml.etree.ElementTree as ET

  num_of_rows = 50
  page_no =1
  list_result = list()

  end_point_url = ("https://apis.data.go.kr/1613000/RTMSDataSvcAptTrade/getRTMSDataSvcAptTrade?"
                   f"serviceKey={service_key}&LAWD_CD={lawd_cd}&DEAL_YMD={deal_ymd}"
                   f"&numOfRows={num_of_rows}&pageNo={page_no}")
  
  response = requests.get(end_point_url)
  root = ET.fromstring(response.text)

  # 2. 데이터 가공하기
  list_result = processing_datas(root)
  return list_result

def save_file(content, file_name, file_type):
  import jsonlines

  # 파일 형태가 현재는 [{거래 #1}, {거래 #2}] 로 되어 있는데
  # 저장할때는 리스트가 아니라 {거래 #1} {거래 #2}로 저장해야함
  if file_type == "json":
    with jsonlines.open(file_name, "w") as f:
      f.write_all(content) 


def upload_to_s3(file_name, bucket_name, object_name):
  from botocore.exceptions import NoCredentialsError
  import boto3

  s3_client = boto3.client("s3")

  try:
    s3_client.upload_file(file_name, bucket_name, object_name)
  except NoCredentialsError:
    print("AWS 자격 증명을 찾을 수 없습니다.")

def main():
  from dotenv import load_dotenv
  import os

  # 변수 설정
  load_dotenv()

  service_key = os.getenv("SERVICE_KEY")
  lawd_cd = "11110" # 종로구
  deal_ymd = "202502"

  # file_name은 파티셔닝에 유리한 구조로 만든다.  
  file_name = f"{deal_ymd}_{lawd_cd}_result.json"
  file_type = "json"

  bucket_name = "smj-realestate-test"
  # 이중 파티션으로 설정
  object_name = f"apt-trade-raw/deal_ymd={deal_ymd}/lawd_cd={lawd_cd}/result.json"

  # 1. API를 통해서 데이터 가져오기
  trade_result = get_apt_trade_from_api(service_key, lawd_cd, deal_ymd)

  # 3. 해당 데이터를 파일로 저장하기
  save_file(trade_result, file_name, file_type)

  # 4. S3에 업로드하기
  upload_to_s3(file_name, bucket_name, object_name)

if __name__ == "__main__":
  main()

 

결과물

  • local에 가공된 데이터가 생성

가동된 local 데이터

  • S3 버킷에 업로드 된 파일
    • 올바른 이중 파티션 구조를 유지한채 저장된것을 확인할 수 있음

이중 파티션을 유지한채 저장된 S3 버킷 상태


4. EC2 인스턴스 생성 및 코드 실행

이전 단계에서 만든 파이썬 코드를 통해 공공 데이터를 가져오고 이를 S3에 업로드하는 단계를 진행하였다.

다만 24시간 동안 개인 PC를 통해서 위 작업을 할 수 없기 때문에
EC2 인스턴스를 통해 해당 인스턴스에서 우리가 만든 코드를 실행하는 작업을 해보겠다.

또한 현재는 lawd_cd가 11110밖에 없지만 
EC2를 통해서는 다른 lawd_cd 로 불러 올 수 있도록 해보겠다.

 

EC2의 인스턴스 생성 및 코드 이동 작업

 

smj-real-estate-test 이름으로 EC2 인스턴스 하나를 생성하였고,

터미널로 해당 인스턴스에 접속까지 해두었다.

 

이제 우리가 전에 작업해둔 파이썬 코드들을 해당 인스턴스로 이동시키는 작업이 필요한데,

이때는 리눅스의 scp명령어를 통해 할 수 있다.

 

따라서 하나의 터미널에는 작업한 코드들이 있는 저장소를 열고

하나의 터미널에는 EC2 우분투 서버 안을 열어두고 실습을 했다.

 

아래 scp 명령어를 통해 main.py 파일과 requirments.txt (node_modules의 의존성 같은 파일)을 EC2 인스턴스로 옮겨두었다.

scp -i test-keypair.pem requirements.txt ubuntu@43.201.62.20:/home/ubuntu/

scp -i test-keypair.pem main.py ubuntu@43.201.62.20:/home/ubuntu/

 

저장후 EC2 인스턴스를 보면 아래와 같이 정상적으로 잘 들어온것을 확인할 수 있다.

ubuntu@ip-172-31-13-88:~$ ls -rtl
total 8
-rw-r--r-- 1 ubuntu ubuntu 3357 Apr  7 11:03 main.py
-rw-r--r-- 1 ubuntu ubuntu  251 Apr  7 11:07 requirements.txt

 

그 다음 아래 명령어를 통해 pip 의존성들을 추가해주면 된다.

pip install -r requirements.txt

 

잘 설치된 후 파이썬 코드를 실행시키면

python3 main.py

 

S3에 정상적으로 파일들이 저장된 것을 확인할 수 있다.

 

 

만약 마지막 파이썬 파일 실행할때 AWS 권한 오류가 발생한다면

.aws/credentials 파일을 만들어서 service Key를 등록해주면 된다.


5-1. Lambda를 이용한 데이터 변환

사실 이번 실습의 데이터 양이 그렇게 크지 않아서
Lambda로만 해도 충분히 데이터 변환이 가능하지만,

한번 Glue ETL로도 데이터 변환을 해볼 예정이다. (5-2에 있음)

 

시작전에 lambda를 통해 변환한 데이터를 저장할 폴더를 S3안에 apt-trade-processed-lambda 이라는 이름으로 만들어뒀다.

 

또한 참고로 lambda의 이벤트 감지후 데이터를 변환하는 작업을 할예정이다. 

(raw 데이터가 들어오면 이를 감지해서 변환하는 작업)

 

아래는 전체적인 Lambda의 코드이다.

핵심적인 부분은 주석으로 설명을 남겨두었다.

 

전체적으로 3단계로 해당 코드는 짜여져있다.

  1. 이벤트가 들어온 S3 정보를 가져오는 부분
  2. S3 정보를 이용해서 실제 들어온 데이터를 가져오고 가공하는 부분
  3. 가공한 데이터를 다시 S3에 저장하는 부분
import json
import urllib.parse
import boto3

s3 = boto3.client('s3')

def lambda_handler(event, context):
    # TODO 
    print(f"EVENT: {event}")
    print(f"CONTEXT: {context}")

    # 1. 이벤트가 들어온 S3 정보를 가져오는 부분
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')

    print(f"BUCKET: {bucket}")
    print(f"KEY: {key}")

    # 2. S3 정보를 이용해서 실제 들어온 데이터를 가져오고 가공하는 부분
    try:
        response = s3.get_object(Bucket=bucket, Key=key)

        raw_data = response['Body'].read().decode('utf-8')

        list_result = []
        for line in raw_data.strip().split("\n"):
            dict_temp = dict()
            if line.strip():
                line_json = json.loads(line)
                
                dict_temp["sgg_cd"] = line_json["sggCd"]
                dict_temp["sgg_nm"] = dict_sgg_nm.get(line_json["sggCd"], "") 
                dict_temp["umd_nm"] = line_json["umdNm"]
                dict_temp["apt_nm"] = line_json["aptNm"]
                dict_temp["jibun"] = line_json["jibun"]
                dict_temp["exclu_use_ar"] = float(line_json["excluUseAr"])
                dict_temp["deal_year"] = int(line_json["dealYear"])
                dict_temp["deal_month"] = int(line_json["dealMonth"])
                dict_temp["deal_day"] = int(line_json["dealDay"])
                dict_temp["deal_amount"] = int(line_json["dealAmount"].replace(",", ""))
                dict_temp["floor"] = int(line_json["floor"])
                dict_temp["build_year"] = int(line_json["buildYear"])
                dict_temp["cdeal_type"] = None if line_json["cdealType"] == " " else line_json["cdealType"]
                dict_temp["cdeal_day"] = None if line_json["cdealDay"] == " " else line_json["cdealDay"]
                dict_temp["dealing_gbn"] = None if line_json["dealingGbn"] == " " else line_json["dealingGbn"]
                dict_temp["estate_agent_sgg_nm"] = None if line_json["estateAgentSggNm"] == " " else line_json[
                    "estateAgentSggNm"]
                dict_temp["rgst_date"] = None if line_json["rgstDate"] == " " else line_json["rgstDate"]
                dict_temp["apt_dong"] = None if line_json["aptDong"] == " " else line_json["aptDong"]
                dict_temp["sler_gbn"] = None if line_json["slerGbn"] == " " else line_json["slerGbn"]
                dict_temp["buyer_gbn"] = None if line_json["buyerGbn"] == " " else line_json["buyerGbn"]
                dict_temp["land_leasehold_gbn"] = None if line_json["landLeaseholdGbn"] == " " else line_json[
                    "landLeaseholdGbn"]
                list_result.append(dict_temp)

    # 3. 가공한 데이터를 다시 S3에 저장
    # list 형태의 값들을 string 값으로 전환하는 작업
    processed_data = "\n".join(json.dumps(result, ensure_ascii=False) for result in list_result)
    # 그 다음 s3에 해당 값을 넣음
    s3.put_object(Bucket=bucket, Key=f"apt-trade-processed-lambda/{key}", Body=processed_data)

    # 에러 컨트롤
    except Exception as e: 
        print(f"ERROR: {e}")

    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

 

 

트리거도 아래와 같이 추가했다.

정상 동작 확인 하기

Lambda > 모니터링 > CloudWatch 로그를 켜둔상태로

EC2에서 다시 한번 파이썬 코드를 실행시켜 Lambda를 트리거 시킨뒤 확인해본다.

트리거 된 모습

 

또한 Lambda를 통해 변환된 데이터가 S3에 정상적으로 담긴 모습도 볼 수 있다.

 

단점

Lambda를 통한 데이터 변환은 특정 이벤트가 발생해야만 트리거된다는 문제가 있어서,

기존 데이터에 대한 변환이 힘들다. (+ 대용량 데이터 처리도 힘들다)

따라서 아래 5-2의 Glue ETL을 사용하는 버전도 해봤다.


5-2. Glue ETL을 이용한 데이터 변환

1) 데이터 테이블을 위한 Crawlers 만들기

  • real_estate_crawler 이라는 Crawler를 만든다.
  • run crawler를 통해서 데이터에 대한 테이블을 생성한다.

2) Visual ETL을 통한 ETL 작업

Data preview

아래 SQL을 통해서 데이터를 가공해준다.

select sggCd as sgg_cd
     , case when sggCd = '11110' then '종로구'
            when sggCd = '11140' then '중구'
            when sggCd = '11170' then '용산구'
            when sggCd = '11200' then '성동구'
            when sggCd = '11215' then '광진구'
            when sggCd = '11230' then '동대문구'
            when sggCd = '11260' then '중랑구'
            when sggCd = '11290' then '성북구'
            when sggCd = '11305' then '강북구'
            when sggCd = '11320' then '도봉구'
            when sggCd = '11350' then '노원구'
            when sggCd = '11380' then '은평구'
            when sggCd = '11410' then '서대문구'
            when sggCd = '11440' then '마포구'
            when sggCd = '11470' then '양천구'
            when sggCd = '11500' then '강서구'
            when sggCd = '11530' then '구로구'
            when sggCd = '11545' then '금천구'
            when sggCd = '11560' then '영등포구'
            when sggCd = '11590' then '동작구'
            when sggCd = '11620' then '관악구'
            when sggCd = '11650' then '서초구'
            when sggCd = '11680' then '강남구'
            when sggCd = '11710' then '송파구'
            when sggCd = '11740' then '강동구'
            else ''
       end as sgg_nm
     , umdNm as umd_nm
     , aptNm as apt_nm
     , jibun
     , cast(excluUseAr as float) as exclu_use_ar
     , cast(dealYear as int) as deal_year
     , cast(dealMonth as int) as deal_month
     , cast(dealDay as int) as deal_day
     , cast(replace(dealAmount, ',', '') as int) as deal_amount
     , cast(floor as int) as floor
     , cast(buildYear as int) as build_year
     , case when cdealType == ' ' then null
            else cdealType
       end as cdeal_type
     , case when cdealDay == ' ' then null
            else cdealDay
       end as cdeal_day
     , case when dealingGbn == ' ' then null
            else dealingGbn
       end as dealing_gbn
     , case when estateAgentSggNm == ' ' then null
            else estateAgentSggNm
       end as estate_agent_sgg_nm
     , case when rgstDate == ' ' then null
            else rgstDate
       end as rgst_date
     , case when aptDong == ' ' then null
            else aptDong
       end as apt_dong
     , case when slerGbn == ' ' then null
            else slerGbn
       end as sler_gbn
     , case when buyerGbn == ' ' then null
            else buyerGbn
       end as buyer_gbn
     , case when landLeaseholdGbn == ' ' then null
            else landLeaseholdGbn
       end as land_leasehold_gbn
     , deal_ymd
     , lawd_cd
  from apt_trade_raw

 

3) node를 추가해서 ETL의 목적지를 설정한다.

SQL을 통해 변환된 데이터들을 저장할 목적지를 정해야하는데, 미리 만들어둔 S3 저장소에 저장한다.

4) Glue ETL을 run 후 결과물 확인

아래와 같이 결과물이 정상적으로 저장된 것을 확인할 수 있다.


6. Athena를 이용한 데이터 Ad-hoc 분석

앞에 Glue ETL을 통해서 생성한 테이블에다가 Athena를 연결해서
데이터 Ad-hoc분석을 해보고자 한다.

 

아래 쿼리를 통해서 각각의 데이터들의 추이를 확인할 수 있고 분석할 수 있다.

-- 총 건수
SELECT COUNT(*) AS cnt
  FROM test.apt_trade_processed
;

-- 년월별 거래량 추이
SELECT deal_ymd, COUNT(*) AS cnt
  FROM test.apt_trade_processed
 GROUP BY deal_ymd
 ORDER BY deal_ymd
;

-- 구별 거래량 추이
SELECT sgg_nm, COUNT(*) AS cnt
  FROM test.apt_trade_processed
 WHERE deal_year=2025
 GROUP BY sgg_nm
 ORDER BY cnt DESC
;

-- 구별 최소, 최대, 평균 거래 가격
SELECT sgg_nm
     , MIN(deal_amount) AS min_price
     , MAX(deal_amount) AS max_price
     , CEIL(AVG(deal_amount)) AS avg_price
  FROM apt_trade_processed
 WHERE deal_year=2025
 GROUP BY sgg_nm
 ORDER BY avg_price DESC
;

-- 특정 구의 평균 거래 가격 추이
SELECT deal_ymd
     , sgg_nm
     , CEIL(AVG(deal_amount)) AS avg_price
  FROM apt_trade_processed
 WHERE sgg_cd = '11710'
 GROUP BY deal_ymd
        , sgg_nm
 ORDER BY deal_ymd
;

7. Redshift를 이용한 데이터 웨어하우스 분석

현재 ETL을 통해서 변환된 데이터가 S3에 저장되어 있는데,
이 S3 데이터를 Redshift에 넣는 작업을 해보려고 한다.

 

우선, 만들어둔 Redshift 클러스터와 DBeaver를 연결시킨다.

 

7-1. COPY 명령어를 통한 방법

우선 COPY 명령어를 통해 S3에 있는 데이터를 가져오는 방식으로 해보겠다.

 

테이블 생성

우선 S3에 있는 데이터를 가져오기 전에 테이블을 생성해야한다.

-- COPY 명령어를 이용해서 진행해보기
-- S3의 파티션 키까지 가져올 수 없음.
-- 파티션 키까지 가져오려면 파티션 키를 포함하여 UNLOAD를 다시 하고 COPY해야 함.
CREATE TABLE metacode.real_estate_apt_trade (
    sgg_cd varchar(10),
    sgg_nm varchar(30),
    umd_nm varchar(20),
    apt_nm varchar(100),
    jibun varchar(10),
    exclu_use_ar float,
    deal_year int,
    deal_month int,
    deal_day int,
    deal_amount int,
    floor int,
    build_year int,
    cdeal_type varchar(10),
    cdeal_day varchar(10),
    dealing_gbn varchar(20),
    estate_agent_sgg_nm varchar(500),
    rgst_date varchar(50),
    apt_dong varchar(50),
    sler_gbn varchar(50),
    buyer_gbn varchar(50),
    land_leasehold_gbn varchar(5)
);

 

COPY를 통한 데이터 가져오기

아래 COPY 명령어를 통해서 데이터를 가져올 수 있다.

COPY metacode.real_estate_apt_trade
FROM 's3://{버킷 주소}'
IAM_ROLE '{나의 IAM_ROLE}'
FORMAT AS PARQUET;

실제로 들어온 데이터 값

 

COPY 명령어를 통한 문제

S3의 파티션 키까지 가져올 수 없다.

만약 파티션 키까지 가져오려면 파티션 키를 포함하여 UNLOAD를 다시 하고 COPY 해야함. 

지금 S3에서 가져오고 있기 때문에 Spectrum 방법도 좋다.

 

7-2. Spectrum 을 통한 방법

Spectrum을 사용하면 파티션 키까지 모두 가져올 수 있다.

CREATE EXTERNAL SCHEMA metacode_external
FROM DATA CATALOG
DATABASE 'test'
IAM_ROLE ''
CREATE EXTERNAL DATABASE IF NOT EXISTS;

-- Table 확인
SELECT *
  FROM metacode_external.apt_trade_processed
 LIMIT 10;

8. Quicksight를 이용한 시각화 대시보드

데이터 세트 만들기 > Redshift 를 수동으로 연결하기

 

아래는 연도별 거래량과 가격을 그래프로 나타낸 모습이다.

 

등등 테이블과 데이터들을 바탕으로 여러 그래프들을 만들 수 있다.

만들 수 있는 다양한 시각화 그래프


9. 소감

AWS 서비스를 통해 간단한 데이터 파이프라인을 만들어 보았는데,

'어떤 기술(서비스)을 쓰는가' 보단 '왜 이런 기술을 쓰는게 효율적인가'에 집중하면서 하다 보니

기술에대한 이해도도 조금씩 쌓이는 것 같았다.

 

현재는 자동화된 데이터파이프라인이 아니라 다소 아쉬운 감이 있지만,

앞으로 공부하면서 충분히 자동화 파이프라인도 구현할 수 있겠다는 자신감이 생긴것 같다.

 

전체적인 데이터의 흐름과, 어떻게 하면 비용&성능 측면에서 더 효율적인 파이프라인을 설계할 수 있는지 고민해보면 좋을 것 같다.

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/06   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함