Day29_SQL분석_73‐78_PySpark&SQL - bonniekwon0721/Dataanalytics-study GitHub Wiki
30/MAR/2024
pyspark로 데이터 살펴보기
0. 먼저 시작하기전에 설정할 것들 먼저 설정해주기
!apt-get install openjdk-8-jdk-headless # jdk 설치
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz # spark file
!tar -xf spark-3.0.0-bin-hadoop3.2.tgz # 스파크 압축풀기
!pip install findspark # 스파크 찾기
!pip install kaggle --upgrade # 캐글 데이터를 다운받기 위해 kaggle library 설치
import os # 운영체제와의 상호작용을 돕는 다양한 기능을 제공하는 모듈
import findspark
# 환경변수에 path 지정
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"
findspark.init() # spark의 경우 잘 찾지 못하는 경우가 있어 findsaprk를 이용
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("pyspark_test")
.master("local[*]") # local에서 사용하며, 모든 쓰레드를 사용하도록 지정
.getOrCreate()
)
# 지금 만들어진 spark 객체의 설정을 알아볼까요?
spark.sparkContext.getConf().getAll()
from google.colab import drive
drive.mount('/content/gdrive') # 셀을 실행시키면 뜨는 창에 연결할 구글 드라이브 계정으로 로그인
!unzip /content/gdrive/MyDrive/Colab_Notebooks/Zerobase/Class_SQL_Analystics/Chap7/data/data_parquet.zip # 압축풀기 -> 본인이 저장한 구글 드라이브 경로에 맞게 수정
df = spark.read.parquet("data_parquet")
1. 데이터 살펴보기
# show: 데이터 프레임을 보여줌
df.show()
# count: 데이터가 몇 행이 있는지
df.count()
# type 종류
# string: 문자열
# integer: 실수형
# double: 정수형
df.printSchema()
2. 결측치 처리하기
- 결측치란?
- 데이터에 해당 값이 없는 것
- 분석을 할 때, 정보가 없다면?
- 적절한 값으로 채워 넣거나
- 값이 없는 데이터를 삭제
3. pyspark sql functions 사용하기
- several built-in standard functions to work with DataFrame and SOL queries
pyspark 함수로 결측치 처리하기 1
pyspark sql functions 사용하기
- several built-in standard functions to work with DataFrame and SQL queries
from pyspark.sql import functions as F
(
df
.select("city")
).show(5)
-- 열 이름 보기
print(df.columns)
-- 결측치 찾기
(
df
.select([
F.count(F.when(F.isnull(c),c)).alias(c) for c in df.columns
])
).show(5)
# list comprehension
value_list = [1,2,3,4,5,6]
[str(value) for value in value_list]
# if, elase
[str(value) for value in value_list if value >= 3]
[str(value) if value >= 3 else str(value+2) for value in value_list]
- select
- 특정 컬럼을 선택하기 위한 함수
- count
- 데이터프레임의 행 갯수를 연산
- when
- 데이터프레임에서 if문과 같은 조건문을 만드는 함수
- F.when(조건(True), 조건에 부합할 시 반환값).otherwise(조건에 부합하지 않을 시 반환값)
- isnull
- 해당 값이 null인 경우 True를 반환
- alias
- 새롭게 연산된 컬럼의 컬럼명 설정
pyspark 함수로 결측치 처리하기 2
# filter(구문) where
df.filter(F.col("StoreLocation").isNull()).show(5)
- filter
- 구문이 True인 값을 필터링
- where 함수도 동일한 기능
-- 방법1
# filter(구문) where
df.filter(F.col("CountyNumber").isNull()).filter(F.col("County").isNotNull()).count()
# isNull, isNotNull : 각 어떤 컬럼이 null인지 아닌지
-- 방법2
(df.filter(
(F.col("CountyNumber").isNull())
& (F.col("County").isNotNull())
).count())
- 와우 CountyNumber는 Null 값이 매우 많네요.
# 우리의 목적은?
# 지역별, 어떤 아이팀이 잘 팔리는지!
# StoreLocation, CountyNumber 사용을 하지 않아도 될 것 같다.
* Drop
* 명시된 컬럼을 제외한 데이터 프레임을 반환
df = df.drop("StoreLocation", "CountyNumber")
df.printSchema()
pyspark 함수로 결측치 처리하기 3
사용할 수 없는 row 제거하기
- 참고: row == 행
- 우리의 목적은?
- 각 지역별로 주류의 판매 인사이트를 제공하는 것
- 즉, 어떤 지역인지 알 수 없으면 의미 없는 정보
# 지역 정보 중 가장 null이 없는 정보는?
# 결측치를 보기 위해 사용했던 코드 재탕
# 그러나, 이번에는 지역 정보만 선택해봅시다.
# df.columns 대신 지역 컬럼명만 처리
region_cols = ["Address", "City", "ZipCode", "County"]
df.select(
[F.count(F.when(F.isnull(c), c)).alias(c) for c in region_cols] # df.columns
).show()
근소한 차이지만, city가 가장 결측치가 적다.
city가 없는 row를 제거하고, address와 zipcode, County 정보는 사용하지 말자..!
df = df.filter(F.col("City").isNotNull())
- filter
- isNotNull
- null이 아닐 경우 True
- (자매품) isNull
- null일 경우 True
df = df.drop("Address", "Zipcode", "County")
df.printSchema()
결측치를 다른 값으로 채워주기
# 연속형 변수
# mean, medain, ..
# 범주형 변수
df.select(
[F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]
).show()
결측치가 많은 Category 와 CategoryName을 처리해주자
df.select("Category", "CategoryName").show()
Category는 CategoryName의 id
Category는 있는데, CategoryName이 없는 경우가 있을까?
(
df
.filter(F.col("Category").isNotNull()) # Category는 null이 아님
.filter(F.col("CategoryName").isNull()) # CategoryName은 null
).show()
1022200는 다 CategoryName이 없나?
(
df
.filter(F.col("Category")==1022200) # 특정 카테고리 id
.filter(F.col("CategoryName").isNotNull())
).show()
pyspark 함수로 결측치 처리하기 4
그런데 호오옥시..
같은 Categry에 Name이 다른 경우는 없을까요?
from pyspark.sql import functions as F
from pyspark.sql import Window as W
(
df
.withColumn(
"CategoryName_cnt",
F.size(
F.collect_set("CategoryName").over(W.partitionBy("Category"))
)
)
.filter(F.col("CategoryName_cnt") >= 2)
).show()
pyspark 함수로 결측치 처리하기 5
Category를 기반으로 CategoryName 결측치를 채워주기
df = (
df
.withColumn(
"CategoryName", # 원래 있는 컬럼과 동일한 이름으로 withColumn을 할 경우, 기존 컬럼에 덮어쓰기가 됩니다.
F.first(
F.col("CategoryName"), ignorenulls=True # null인 경우를 제외하고 가장 첫번째
).over(W.partitionBy(F.col("Category")).orderBy(F.col("Date").desc()))
# 다른경우가 존재하기 때문에 가장 최근 날짜 기준 이름으로 덮어쓰기
)
)
df = df.filter(F.col("Category").isNotNull()).filter(F.col("CategoryName").isNotNull())
pyspark 함수로 결측치 처리하기 6
# 덮어 썼는데도 Null이 나타나는지 보는 함수
category_cols = ["Category", "CategoryName"]
df.select(
[F.count(F.when(F.isnull(c), c)).alias(c) for c in category_cols]
).show()
덮어썼지만 아직 결측치 Null 이 나타난다.
그래서 결측치는 없에려고 합니다.
df = df.filter(F.col("Category").isNotNull()).filter(F.col("CategoryName").isNotNull())
df.count() # 아직도 2천6백만개 이상
-- 그다름 아직 결측치가 있는 다른 열에 대한것을 for문을 이용한 null값 제거
null_cols = ["VendorNumber", "VendorName", "StateBottleCost", "StateBottleRetail", "SaleDollars"]
for col_name in null_cols:
print(col_name, df.filter(F.col(col_name).isNull()).count())
df = df.filter(F.col(col_name).isNotNull())
df.select(
[F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]
).show()
데이터 형 변환하기
df.select("Date").show(5)
# Date 컬럼이 string으로 되어있으며, 월/일/년 이라는 특이한 방식으로 저장되어있음
df = df.withColumn("Date", F.to_date(F.col("Date"), 'MM/dd/yyyy'))
df.select("Date").show(5)
# 변환된 데이터 확인.
df.printSchema()
-
to_date
- string을 date 형태로 바꿔줌
- F.to_date(변형할 컬럼, format="현재 string이 어떤 형태로 써져있는지 명시")
-
시간 형태를 위한 또 다른 매서드
- to_timestamp
- string을 timestamp 형태로 변형
- F.to_timestamp(변형할 컬럼, format="현재 string이 어떤 형태로 써져있는지 명시")
- MM-dd-yyyy HH:mm:ss.sss‘
- to_timestamp
-
MM-dd-yyyy HH:mm:ss.sss‘
[pyspark 함수 복습 (feat.sql query 비교) 1](https://www.notion.so/pyspark-feat-sql-query-1-fcb1325e11684b39938ee8229cb0c3d9?pvs=21)
[pyspark 함수 복습 (feat.sql query 비교) 2](https://www.notion.so/pyspark-feat-sql-query-2-672fe9be2edb4b348a82c23193cee2f0?pvs=21)
[pyspark 함수 복습 (feat.sql query 비교) 3](https://www.notion.so/pyspark-feat-sql-query-3-22a9786fae904c3186615d976857407f?pvs=21)
Plotly로 인터랙티브하게 시각화하기 1
먼저 앞에 세팅은 다 해주고 결측치 제거 함수를 다시한번 작성해줍시다.
# 결측치 제거 및 데이터 형 변환 함수
from pyspark.sql import functions as F
from pyspark.sql import Window as W
def preprocess_df(df):
df = (
df.drop("StoreLocation", "CountyNumber", "Address", "Zipcode", "County")
.filter(F.col("City").isNotNull())
.withColumn(
"CategoryName",
F.first(
F.col("CategoryName"), ignorenulls=True
).over(W.partitionBy(F.col("Category")))
)
.filter(F.col("Category").isNotNull())
.filter(F.col("CategoryName").isNotNull())
.filter(F.col("VendorNumber").isNotNull())
.filter(F.col("StateBottleCost").isNotNull())
.withColumn("Date", F.to_date(F.col("Date"), 'MM/dd/yyyy'))
)
return df
df = preprocess_df(df)
%pip install plotly==5.11.0 # plotly 설치
sample_pd = df.filter(F.col("City")=="WAUKEE").toPandas()
- toPandas
- pyspark dataframe 객체를 pandas dataframe 객체로 변환
- pandas로 변환하는 이유?
- 시각화를 하기 위해서는 모든 데이터를 한번에 로드해야함
- 대부분의 시각화 라이브러리 지원 가능
- pyspark는 분산 처리 기반
- 즉, 시각화에는 적합하지 않음
sample_pd
WAUKEE_pd = (
sample_pd
.groupby("Date") # Date 기준으로
.sum() # 총 합
.reset_index() # groupby로 사용되어 index로 들어간 date 컬럼으로 다시 변환
)
WAUKEE_pd
import plotly.graph_objects as go
# go.Figure() 함수를 활용하여 기본 그래프를 생성
fig = go.Figure(
# Data 입력
data=[go.Line(x=WAUKEE_pd["Date"], y=WAUKEE_pd["BottlesSold"])],
# layout 입력
layout=go.Layout(
title=go.layout.Title(text="Bottles Sold amount at WAUKEE")
)
)
#show하면 내 노트북 (주피터 노트북 등)에 그래프 표시
fig.show()
import plotly.express as px
fig = px.line(x=WAUKEE_pd["Date"], y=WAUKEE_pd["BottlesSold"], title="Bottles Sold amount at WAUKEE")
# 위 그래프와 동일하지만 조금 더 간편하게 그래프 그리기 가능.
fig.show()
plotly graph_objects VS express
- graph_objects
- 직접 지정해줘야하는게 많지만, 커스터마이징이 가능
- express
- 완성형 보편적인 그래프를 그릴 수 있음
- graph_objects와 express의 관계는 범용적인 시각화 라이브러리인 matplotlib과 seaborn의 관계와 비슷함
왜 plotly를 사용하나요?
- interactive한 그래프를 그릴 수 있음 -
- python에 익숙하다면 plotly에서 개발한 dash를 사용해 웹 대시보드까지도 발전 가능
(TIP) Pandas 기본 시각화로 plotly 사용하기
- pandas는 기본적으로 matplotlib library를 바탕으로 시각화가 됨
- plotly를 사용하기 위해서는 아래와 같은 기본 설정을 추가하면 됨
import pandas as pd
pd.options.plotting.backend = "plotly"
WAUKEE_pd["BottlesSold"].hist() # 기본 라이브러리로 시각화 가능
Plotly로 인터랙티브하게 시각화하기 2
plotly의 interactive 기능 간단히 살펴보기
- Hover Label
fig = px.line(x=WAUKEE_pd["Date"], y=WAUKEE_pd["BottlesSold"],title="Bottles Sold amount at WAUKEE")
# hover label 스타일 설정
fig.update_layout(
hoverlabel_bgcolor="white",
hoverlabel_font_size=10,
hoverlabel_font_color='black',
hoverlabel_font_family="Rockwell")
fig.show()
fig = px.line(
WAUKEE_pd,
x="Date",
y="BottlesSold",
title="Bottles Sold amount at WAUKEE",
text="SaleDollars" # text로 사용할 컬럼 지정
)
# hover label 스타일 설정
fig.update_layout(
hoverlabel_bgcolor="white",
hoverlabel_font_size=15,
hoverlabel_font_color='black',
hoverlabel_font_family="Rockwell"
)
# hover label의 세부 내용 작성
# html sytle
fig.update_traces(hovertemplate='총 매출: %{text}달러 <br>'
'날짜: %{x} <br>'+
'판매량 : %{y}개')
fig.show()
- slider
fig = px.line(
WAUKEE_pd,
x="Date",
y="BottlesSold",
title="Bottles Sold amount at WAUKEE",
text="SaleDollars"
)
# hover label 스타일 설정
fig.update_layout(
hoverlabel_bgcolor="white",
hoverlabel_font_size=15,
hoverlabel_font_color='black',
hoverlabel_font_family="Rockwell"
)
# hover label
fig.update_traces(hovertemplate='총 매출: %{text}달러 <br>'
'날짜: %{x} <br>'+
'판매량 : %{y}개')
#범위 슬라이더 생성
fig.update_layout(xaxis=dict(rangeslider_visible=True)) # x축 기준 슬라이더
fig.show()
- drop down
import plotly.graph_objects as go
fig = go.Figure()
# 1번 그래프
fig.add_trace(go.Line(
name="BottlesSold",
x=WAUKEE_pd["Date"],
y=WAUKEE_pd["BottlesSold"]
))
# 2번 그래프
fig.add_trace(go.Line(
name="SaleDollars",
x=WAUKEE_pd["Date"],
y=WAUKEE_pd["SaleDollars"]
))
fig.update_layout(
updatemenus=[
dict(
type="dropdown",
direction="down",
buttons=list([
dict(label="Both",
method="update",
args=[{"visible": [True, True]},
{"title": "BottlesSold & SaleDollars"}]),
dict(label="BottlesSold",
method="update",
args=[{"visible": [True, False]},
{"title": "BottlesSold",}]),
dict(label="SaleDollars",
method="update",
args=[{"visible": [False, True]},
{"title": "SaleDollars",}]),
]),
),
]
)
fig.show()
주류시장 동향 파악하기 1 - 매출 추이와 성장률
전체 추이 살펴보기
1. 산업은 성장하고 있는가?
- 성장세를 알아보기 위해 2가지 분석 진행
- 전반적으로 매출이 증가하고 있는가?
- 전반적으로 상점 수가 증가하고 있는가?
pd1 = (
df
.withColumn("Year", F.year("Date").cast("long")) #년/월/일에서 년도 정보만 추출
.filter(F.col("Year") < 2023) # 2023년은 정보가 일부만 있으므로 제외
.groupBy("Year") # 년도별로 group화
.agg(
F.sum("SaleDollars").cast("long").alias("SaleDollars_sum"), # SaleDollars 총합
F.countDistinct(F.col("StoreNumber").cast("string")).alias("Store_cnt") # StoreNumber 고유 갯수
)
.orderBy("Year") # 년도로 정렬
).toPandas()
pd1
import plotly.express as px
fig = px.line(x=pd1["Year"], y=pd1["SaleDollars_sum"],title="매출 추이")
# 위 그래프와 동일
fig.show()
fig = px.line(x=pd1["Year"], y=pd1["Store_cnt"],title="상점 수 추이")
# 위 그래프와 동일
fig.show()
• 매출의 성장률은 어느정도일까?
(
df
.withColumn("Year", F.year("Date").cast("long")) #년/월/일에서 년도 정보만 추출
.filter(F.col("Year") < 2023) # 2023년은 정보가 일부만 있으므로 제외
.groupBy("Year") # 년도별로 group화
.agg(
F.sum("SaleDollars").cast("long").alias("SaleDollars_sum"), # SaleDollars 총합
F.countDistinct(F.col("StoreNumber").cast("string")).alias("Store_cnt") # StoreNumber 고유 갯수
)
.withColumn("SaleDollars_sum_bef", F.lag("SaleDollars_sum").over(W.orderBy("Year")))
# Year를 기준으로 이전 년도 SaleDollars_sum 값을 반환
).show()
- lag
- 이전 row의 값을 반환
- (참고) lead
- 이후 row의 값을 반환
pd2 = (
df
.withColumn("Year", F.year("Date").cast("long")) #년/월/일에서 년도 정보만 추출
.filter(F.col("Year") < 2023) # 2023년은 정보가 일부만 있으므로 제외
.groupBy("Year") # 년도별로 group화
.agg(
F.sum("SaleDollars").cast("long").alias("SaleDollars_sum"), # SaleDollars 총합
F.countDistinct(F.col("StoreNumber").cast("string")).alias("Store_cnt") # StoreNumber 고유 갯수
)
.withColumn("SaleDollars_sum_bef", F.lag("SaleDollars_sum").over(W.orderBy("Year")))
# Year를 기준으로 이전 년도 SaleDollars_sum 값을 반환
# 성장률 계산하기
.withColumn(
"SaleDollars_growth_rate",
100*(F.col("SaleDollars_sum")-F.col("SaleDollars_sum_bef"))/F.col("SaleDollars_sum_bef")
)
).toPandas()
- 성장률(= 증감률) (growth rate)이란?
- 100 * (이번결과값 - 지난번결과값) / 지난번결과값
- 즉, 과거 대비 이번에 증감한 비율
fig = px.line(x=pd2["Year"], y=pd2["SaleDollars_growth_rate"],title="매출 성장률 추이")
# 위 그래프와 동일
fig.show()
주류 시장 전체 동향 결과
- 매출을 증가하는 추세
- 그러나, 최근 성장률이 낮아짐
- 점포 수 역시 증가 추세
주류시장 동향 파악하기 2 - 고매출 지역 찾기
pd3 = (
df
.withColumn("Year", F.year("Date").cast("long")) #년/월/일에서 년도 정보만 추출
.filter(F.col("Year") < 2023) # 2023년은 정보가 일부만 있으므로 제외
.groupBy("Year", "City") # 년도별, 지역별로 group화
.agg(
F.sum("SaleDollars").cast("long").alias("SaleDollars_sum"), # SaleDollars 총합
F.countDistinct(F.col("StoreNumber").cast("string")).alias("Store_cnt") # StoreNumber 고유 갯수
)
.withColumn("SaleDollars_sum_bef", F.lag("SaleDollars_sum").over(W.partitionBy("City").orderBy("Year")))
# Year를 기준으로 이전 년도 SaleDollars_sum 값을 반환
# 성장률 계산하기
.withColumn(
"SaleDollars_growth_rate",
100*(F.col("SaleDollars_sum")-F.col("SaleDollars_sum_bef"))/F.col("SaleDollars_sum_bef")
)
.orderBy("Year") # 년도로 정렬
).toPandas()
fig = px.line(
pd3,
x = "Year", y = "SaleDollars_sum", color = 'City', hover_data = ['City'])
fig.update_layout(title_text = "지역 별 매출 추이")
fig.show()
fig = px.line(
pd3,
x = "Year", y = "SaleDollars_growth_rate", color = 'City', hover_data = ['City'])
fig.update_layout(title_text = "지역 별 매출 성장률 추이")
fig.show()
pd3[pd3.City == "WHITTEMORE"] # SaleDollars_sum이 낮아 SaleDollars_growth_rat이 쉽게 큰 폭으로 증가
- 매출 상위 10 지역만 그려보기
- 매출 상위 10은 2022년도 기준
- 매출은 어느정도 일정하기 때문
top_10_city = (
pd3
[pd3.Year == 2022]
.nlargest(n=10, columns='SaleDollars_sum')
)['City'].tolist()
top_10_city_loc = pd3.City.isin(top_10_city) # City가 top_10_city 안에 있는 경우 true 반환
fig = px.line(
pd3.loc[top_10_city_loc],
x = "Year", y = "SaleDollars_growth_rate", color = 'City', hover_data = ['City'])
fig.update_layout(title_text = "지역 별 매출 성장률 추이")
fig.show()
- 뚜렷한 추세 없음
- 매출을 우선시하면 될 듯
점포 별 평균 매출
pd4 = (
df
.withColumn("Year", F.year("Date").cast("long")) #년/월/일에서 년도 정보만 추출
.filter(F.col("Year") < 2023) # 2023년은 정보가 일부만 있으므로 제외
.groupBy("Year", "City", "StoreNumber") # 년도별, 지역별, 점포별로 group화
.agg(
F.sum("SaleDollars").cast("long").alias("SaleDollars_sum"), # SaleDollars 총합
)
.groupBy("Year", "City")
.agg(
F.avg("SaleDollars_sum").alias("Store_SaleDollars_avg"), # 점포 별 매출 평균
F.sum("SaleDollars_sum").alias("SaleDollars_sum") # 총 점포 매출
)
.orderBy("Year") # 년도로 정렬
).toPandas()
- groupBy가 2번
- year, city, storeNumber로 각 점포의 총 매출 합을 구한 후
- year, city 별로 점포 별 총 매출 평균
fig = px.line(
pd4,
x = "Year", y = "Store_SaleDollars_avg", color = 'City', hover_data = ['City'])
fig.update_layout(title_text = "지역 별 점포 평균 매출 추이")
fig.show()
- 지역 전체 매출과 점포별 매출 평균 상관 관계 분석하기
- 2022년을 기준으로
fig = px.scatter(
pd4[pd4.Year == 2022],
x = "SaleDollars_sum", y = "Store_SaleDollars_avg", color = 'City', hover_data = ['City'])
fig.update_layout(title_text = "지역 별 점포 평균 매출 & 총 매출 관계")
# 수평선 그리기
fig.add_hline(y= 400000,
line_color='red',)
# 수직선 그리기
fig.add_vline(x=3500000,
line_color='red')
fig.show()
- 높은 correlation을 보이지는 않지만, 어느정도 우상향 하는 모습은 존재
- 우리는 컨설턴트가 가게를 열기에 좋은 지역에 대한 인사이트를 주는 것
- 따라서, 일정 수준 매출이 보장되는 지역 + 가게의 수익이 좋은 상위 5개의 지역을 추천 예정
- 위 빨간 선 그래프의 2사분면 (오른쪽 위)가 가장 좋은 지역일 것
- 즉, MOUNT VERNON, DEWITT, WINDSOR HEIGHTS, CORALVILLE, DES MOINES
지역의 영업 이익 평가하기 - pyspark udf
1. 영업이익이란?
- 영업 활동을 통해 순수하게 남은 이익
- 즉, 영업이익 = 매출액 - 매출원가 - 기타 비용
- 본 데이터에서 각 점포 당 기타 비용은 모두 동일하다고 가정 후, 매출액 - 매출원가로 영업이익을 판단
- 실제 상황에서는 점포 별 월세, 관리비, 인건비, 배송비등이 모두 기타 비용에 포함될 것
df.printSchema()
- StateBottleRetail: 판매 가격
- StateBottleCost: 원가
- BottlesSold: 판매 갯수
즉, 영업이익 = BottlesSold * (StateBottleRetail - StateBottleCost)
pypsark udf 사용해보기
- udf?
- User Defined Functions
- 사용자 정의 함수
- 즉, 직접 만든 함수를 pyspark에서 사용하는 것
# 영업 이익을 계산하는 udf 만들기
from pyspark.sql.functions import udf
from pyspark.sql import types as T
def calculate_gross_profit(unit_sales, unit_cost, sales_amt):
gross_profit = sales_amt * (unit_sales - unit_cost)
return gross_profit
calculate_gross_profit_udf = udf(
calculate_gross_profit, # udf로 만들 함수
T.DoubleType() # return type
)
(
df
.withColumn(
"gross_profit",
calculate_gross_profit_udf( # udf 적용
df.StateBottleRetail,
df.StateBottleCost,
df.BottlesSold
)
)
.select("StateBottleRetail", "StateBottleCost", "BottlesSold", "gross_profit")
).show()
pd1 = (
df
.withColumn("Year", F.year("Date").cast("long")) #년/월/일에서 년도 정보만 추출
.filter(F.col("Year") < 2023) # 2023년은 정보가 일부만 있으므로 제외
.withColumn(
"gross_profit",
calculate_gross_profit_udf( # udf 적용
df.StateBottleRetail,
df.StateBottleCost,
df.BottlesSold
)
)
.groupBy("Year", "City", "StoreNumber")
.agg(
F.sum("gross_profit").alias("gross_profit")
)
.groupBy("Year", "City")
.agg(
F.avg("gross_profit").alias("gross_profit_avg")
)
.orderBy("Year")
).toPandas()
pd1
import plotly.express as px
fig = px.line(
pd1,
x = "Year", y = "gross_profit_avg", color = 'City', hover_data = ['City'])
fig.update_layout(title_text = "지역 별 점포 영업이익 추이")
fig.show()
all_cities = pd1['City'].unique().tolist()
our_cities = ["MOUNT VERNON", "DEWITT", "WINDSOR HEIGHTS", "CORALVILLE", "DES MOINES"]
color_dict = {}
for city in all_cities:
if city in our_cities:
color_dict[city] = "red"
else:
color_dict[city] = "gray"
# 우리가 뽑은 5개의 지역만 다른 색으로 보기
fig = px.line(
pd1,
x = "Year", y = "gross_profit_avg", color = 'City',hover_data = ['City'],
color_discrete_map=color_dict # 색상 지정
)
fig.update_layout(title_text = "지역 별 점포 영업이익 추이")
fig.show()
- 우리가 뽑은 5개의 지역이 영업이익 기준으로도 상위권에 위치하고 있음
- GOOD!
매출과 영업이익 상관성 보기
df.columns
pd2 = (
df
.withColumn("Year", F.year("Date").cast("long")) #년/월/일에서 년도 정보만 추출
.filter(F.col("Year") == 2022) # 2022년도를 기준으로
.withColumn(
"gross_profit",
calculate_gross_profit( # udf 적용
df.StateBottleRetail,
df.StateBottleCost,
df.BottlesSold
)
)
.groupBy("City", "StoreNumber")
.agg(
F.sum("SaleDollars").alias("SaleDollars"),
F.sum("gross_profit").alias("gross_profit")
)
.groupBy("City")
.agg(
F.avg("SaleDollars").alias("SaleDollars_avg"),
F.avg("gross_profit").alias("gross_profit_avg")
)
).toPandas()
fig = px.scatter(
pd2,
x = "SaleDollars_avg", y = "gross_profit_avg", color = 'City', hover_data = ['City'])
fig.update_layout(title_text = "지역 별 점포 평균 매출 & 평균 영업이익 관계")
fig.show()
- 굉장히 높은 상관성을 가지고 있음
- 즉, 점포의 매출이 높을 수록 영업이익이 높을 것이라고 예상 가능
Chapter8
metadata란?
부가 정보를 추가하기 위해 데이터에 따라가는 정보라고 생각하면 됩니다. 예를 들어, 파일의 저장 날짜, 종류, 태그 등
(주요하게 분석하는 테이블이 아닌 부가적 데이터)
pyspark에서 MySQL로 데이터 저장하기
전처리를 먼저 쭉 하고…
from pyspark.sql import functions as F
from pyspark.sql import Window as W
df = spark.read.parquet("data_parquet").withColumn("Date", F.to_date(F.col("Date"), 'MM/dd/yyyy')).withColumn("Year", F.year("Date"))
# 결측치 제거 및 데이터 형 변환 함수
def preprocess_df(df):
df = (
df.drop("StoreLocation", "CountyNumber", "Address", "Zipcode", "County")
.filter(F.col("City").isNotNull())
.withColumn(
"CategoryName",
F.first(
F.col("CategoryName"), ignorenulls=True
).over(W.partitionBy(F.col("Category")))
)
.filter(F.col("Category").isNotNull())
.filter(F.col("CategoryName").isNotNull())
.filter(F.col("VendorNumber").isNotNull())
.filter(F.col("StateBottleCost").isNotNull())
.withColumn("Date", F.to_date(F.col("Date"), 'MM/dd/yyyy'))
)
return df
df = preprocess_df(df)
# 상세하게 분석할 데이터만 뽑아내기
our_cities = ["MOUNT VERNON", "DEWITT", "WINDSOR HEIGHTS", "CORALVILLE", "DES MOINES"]
df = (
df
.filter(F.col("City").isin(our_cities))
.filter(F.col("Year") == 2022)
)
df1 = (
df
.select(
"Date", "City", "StoreNumber", "Category", "VendorNumber", "ItemNumber", # 필요한 column만 select
"StateBottleCost", "StateBottleRetail", "BottlesSold", "SaleDollars"
)
)
# 메타데이터 저장하기
StoreNumber_meta = df.select("StoreNumber", "StoreName").distinct()
Category_meta = df.select("Category", "CategoryName").distinct()
VendorNumber_meta = df.select("VendorNumber", "VendorName").distinct()
ItemNumber_meta = df.select("ItemNumber", "ItemDescription").distinct()
StoreNumber_meta.show()
# csv 파일로 저장하기
df1.coalesce(1).write.option("header","true").format("csv").save("city_data_csv")
StoreNumber_meta.coalesce(1).write.option("header","true").format("csv").save("StoreNumber_meta", mode='overwrite')
Category_meta.coalesce(1).write.option("header","true").format("csv").save("Category_meta", mode='overwrite')
VendorNumber_meta.coalesce(1).write.option("header","true").format("csv").save("VendorNumber_meta", mode='overwrite')
ItemNumber_meta.coalesce(1).write.option("header","true").format("csv").save("ItemNumber_meta", mode='overwrite')
- coalesce
- 분산되어 있던 데이터를 n개의 파티션으로 모아줌
- 만약 coalesce를 하지 않고 저장한다면 여러개의 csv 파일로 저장될 것
- n=1 설정으로 하나의 파이로 저장
mysql에 데이터 적재하기
모든 테이블을 신규 스키마(이름: mydb) 를 만들어(원통형을 눌러 만듦) 적재하고 (Table Data Import Wizard를 통해)
select *
from mydb.category_meta;
SQL 기초 1 - 쿼리의 종류와 동작
쿼리의 종류
(1) DDL (Data Definition Language) : 데이터를 정의할 때 사용하는 언어 테이블을 만드는 CREATE, 테이블을 제거하는 DROR 등
(2) DML(Data Manipulation Language) 데이터베이스에 데이터를 저장할 때 사용하는 언어 : 새로운 데이터를 추가하는 INSERT, 데이터를 삭제하는 DELETEL, 변경하는 UPDATE 등
(3)DCL(Data Control Language) : 데이터베이스에 대한 접근권한과 관련된 문법
(4) DQL (Data Query Language) : 정해진 스키마 내에서 쿼리할 수 있는 언어 : SELECT가 DQL에 해당된다. : DOL을 DML의 일부분으로 취급하기도
(5)TCL(Transaction Control Language) :DML을 거친 데이터의 변경사항을 수정 : COMMIT, ROLLBACK
- 우리는 DML 혹은 DQL을 주로 사용하게 된다.
-- SELECT, FROM, WHERE
SELECT * -- 모든 컬럼 선택
FROM mydb.city_data;
SELELCT Date, City -- 특정 컬럼만 선택
FROM mydb.city_data;
SELECT *
FROM mydb.city_data
WHERE Date = '2022-10-21'; -- 조건 정보를 설정
SQL 기초 2 - 유용한 기초 쿼리문
USE[schema]; — defult로 사용할 shema를 설정
-- USE
USE mydb; -- default로 사용할 스키마를 설정
SELECT Date, City -- 특정 컬럼만 선택
FROM city_data;
USE sys;
SELECT Date, City -- 특정 컬럼만 선택
FROM city_data;
SELECT Date, City -- 특정 컬럼만 선택
FROM mydb.city_data; -- [schema],[table]에서 schema를 설정
SHOW[필요한 정보]; — DB, table등의 정보를 알고 싶을 때 사용
-- SHOW
SHOW DATABASES; -- DATABASE 목록
SHOW TABLES FROM mydb; -- 특정 DB에 있는 TABLE목록
SHOW COLUMNS FROM mydb.city_data; -- 특정 TABLE의 정보
SELECT * FROM INFORMATION_SCHEMA.[table]; — DB, table에 대한 정보를 담고 있는 DB
-- INFORMATION_SCHEMA: DB들에 대한 정보를 담고 있는 논리적 테이블
SELECT *
FROM INFORMATION_SCHEMA.TALBLES
WHERE TABLE_SCHEMA = 'mydb'
SELECT *
FORM INFORMATON_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'city_data';
SQL 기초 3 - 필수 기초 쿼리문 (1)
쿼리 종류
- SELECT : 데이터 추출
- FROM : 조회 테이블 확인
- WHERE: 데이터 추출 조건 확인
- ON : 조인 조건 확인
- JOIN : 테이블 조인 (병합)
- GROUP BY : 특정 컬럼 그룹화
- HAVING : 그룹화 이후 데이터 추출 조건
- DISTINCT : 중복 제거
- ORDER BY : 데이터 순서 정렬
- LIMIT: 데이터 반환 수
-- SELECT, fROM, WHERE
USE mydb; # defalut 로 사용할 스키마를 설정
SELECT Date, City -- 특정 컬럼만 선택
fROM city_data
WHERE Date = '2022-10-21'
-- JOIN, ON
SELECT *
FROM city_data a
JOIN item_meta b -- 그냥 JOIN = INNER JOIN
ON a.ItemNumber = b.ItemNumber; -- 특정 컬럼을 기준을 테이블을 합침
SELECT *
FROM city_data a
LEFT JOIN item_meta b
ON a.ItemNumber = b.ItemNumber;
-- GROUP BY
SELECT a.City, a.ItemNumber, SUM(a.BottlesSold) bottle_sold_cnt
FROM city_data a
JOIN item_meta b
ON a.ItemNumber = b.ItemNumber
GROUP BY a.City, a.ItemNumber;
SQL 기초 4 - 필수 기초 쿼리문 (2)
HAVING: 그룹화 이후 데이터 추출 조건
-- HAVING
SELECT a.City, a.ItemNumber, SUM(a.BottlesSold) bottle_sold_cnt
FROM city_data a
JOIN item_meta b
ON a.ItemNumber = b.ItemNumber
-- WHERE a.VendorNumber = 301
GROUP BY a.City, a.ItemNumber
HAVING SUM(a.BottlesSold) > 1000;
-- DISTINCT
SELECT DISTINCT a.ItemNumber
FROM city_data a
JOIN item_meta b
ON a.ItemNumber = b.ItemNumber
GROUP BY a.City, a.ItemNumber
HAVING SUM(a.BottlesSold) > 1000;
ORDER BY : 데이터 순서 정렬
SELECT a.City, a.ItemNumber, SUM(a.BottlesSold)
FROM city_data a
JOIN item_meta b
ON a.ItemNumber = b.ItemNumber
GROUP BY a.City, a.ItemNumber
HAVING SUM(a.BottlesSold) > 1000
ORDER BY SUM(a.BottlesSold) desc;
LIMIT: 데이터 반환 수
-- LIMIT: 샘플 데이터 확인을 위해 조회할 때는 항상 써주는 것이 쿼리 성능에 좋습니다 :)
SELECT a.City, a.ItemNumber, SUM(a.BottlesSold)
FROM city_data a
JOIN item_meta b
ON a.ItemNumber = b.ItemNumber
GROUP BY a.City, a.ItemNumber
HAVING SUM(a.BottlesSold) > 1000
ORDER BY SUM(a.BottlesSold) desc
LIMIT 5;
SQL 기초 5 - 쿼리 실행 순서 이해하기
SQL 문을 작성하는 순서와 SQL 문이 실행되는 순서는 다르다.
쿼리 작성 시 주의점
SELECT DISTINCT a.City, SUM(a. BottlesSold) -- 5
FROM city_data a -- 1
JOIN item meta b. -- 2
ON a.ItemNumber = b.ItemNumber -- 2
GROUP BY a.City, a.ltemNumber -- 3
HAVING SUM(a. BottlesSold) > 1000 -- 4
ORDER BY SUM(a.BottlesSold) desc -- 6
LIMIT 5; -- 7
-- MySQL 에서는 가능합니다!
SELECT a.City, a.ItemNumber, SUM(a.BottlesSold)
FROM city_data a
JOIN item_meta b
ON a.ItemNumber = b.ItemNumber
GROUP BY 1, 2 -- select 1,2번 컬럼으로 group by 진행
HAVING SUM(a.BottlesSold) > 1000
ORDER BY SUM(a.BottlesSold) desc
LIMIT 5;
지역별 주류 판매 데이터 분석
- 아이오와 주의 주류 시장은 전반적으로 매출이 상승세를 보임
- 점포 수도 증가 추세
- 지역의 전반적인 매출과 점포의 매출은 어느정도 상관 관계가 존재
- 지역의 전반적인 매출과 점포 매출이 좋은 상위 5개의 지역을 선정
앞으로 우리가 분석해야 할 내용
- 사업을 시작하기에 가장 알맞은 지역은 어디인가?
- 어떤 주류들을 팔아야 매출이 좋을까?
- 얼마나 많은 도매업자들과 거래 해야하는가?
매출 TOP5 지역의 점포 매출 분석하기 1
USE mydb;
SELECT *
from city_data;
-- 각 지역별 매출을 다시 순위를 정해보자!
select City, ROUND(Sum(SaleDollars)) sales_sum
FROM city_data
group by city
order by sales_sum desc;
-- ROUND: 반올림
select City, ROUND(Sum(SaleDollars), 2) sales_sum -- round (필트, n) n=반올림 자리수
FROM city_data
group by city
order by sales_sum desc;
-- 지역별, 스토어별 매출 순위
select City, StoreNumber, ROUND(SUM(SaleDollars)) sales_sum
from city_data
group by City, StoreNumber
order by sales_sum desc;
-- 각 지역별 스토어 평균 매출
select
City
, COUNT(DISTINCT StoreNumber) store_cnt
, SUM(SaleDollars) / COUNT(DISTINCT StoreNumber) store_sales_avg -- 평균: 총합/갯수
from city_data
group by City
order by store_sales_avg desc;
매출 TOP5 지역의 점포 매출 분석하기 2-3
USE mydb;
-- Store의 갯수
select count(distinct StoreNumber)
from city_data;
-- 매출 상위 20위권 스토어는 어느 지역에 많을까?
select City, StoreNumber, Sum(SaleDollars) sales_sum
from city_data
group by 1, 2
order by sales_sum desc
limit 20;
-- 순위의 함수들
-- row_number : A라는 값으로 순위를 매길 때 1,1,2,3 -> 1,2,3,4
-- rank: 1,1,2,3 -> 1,1,3,4
-- dense_rank: 1,1,2,3 -> 1,1,2,3
-- row_number
select City
, StoreNumber
, Sum(SaleDollars) sales_sum
, row_number() over()
from city_data
group by 1, 2
order by sales_sum desc;
- 순위가 무작위로 들어간다
-- row_number2
select City
, StoreNumber
, Sum(SaleDollars) sales_sum
, row_number() over(order by sum(SaleDollars) desc) store_rank
from city_data
group by 1, 2
order by store_rank;
- 드디어 순서대로 나왔다.
-- row_number3 : 서브쿼리 응용 20위권만 나타내기
select *
from (select City
, StoreNumber
, Sum(SaleDollars) sales_sum
, row_number() over(order by sum(SaleDollars) desc) store_rank
from city_data
group by 1, 2
order by store_rank
) t1
where t1.store_rank <= 20;
서브쿼리를 이용해서 조건을 달아주면 순위 ** 까지만 볼 수 있는 조건을 실행시킬 수 있다.
Studied from 제로베이스 데이터스쿨