Day27_SQL분석_69‐71_PowerBI & PySpark - bonniekwon0721/Dataanalytics-study GitHub Wiki
27/MAR/2024
S3_Daily AVG Sales Price = DIVIDE([S2_Daily AVG Sales Amount], [S1_Daily AVG Sales Qty])[Power BI의 방사형 계기 차트 - Power BI | Microsoft Learn](https://learn.microsoft.com/ko-kr/power-bi/visuals/power-bi-visualization-radial-gauge-charts?tabs=powerbi-desktop)
: 그래서 우리는 2배가 아닌 따로 목표를 만들어보겠음.
Sales_Aim = 10000[KPI(핵심 성과 지표) 시각적 개체 - Power BI | Microsoft Learn](https://learn.microsoft.com/ko-kr/power-bi/visuals/power-bi-visualization-kpi?tabs=powerbi-desktop)
[AVERAGEX 함수(DAX) - DAX | Microsoft Learn](https://learn.microsoft.com/ko-kr/dax/averagex-function-dax)
AVERAGEX(<table>,<expression>)기존 판매 데이터의 요일 평균을 목표치로!
매출 당일 이전! 당일은 제외
[ALL 함수(DAX) - DAX | Microsoft Learn](https://learn.microsoft.com/ko-kr/dax/all-function-dax)
[필터 함수(DAX) - DAX | Microsoft Learn](https://learn.microsoft.com/ko-kr/dax/filter-functions-dax)
- AVERGEX 함수
- ALL 함수
- FILTER 조건
Weekday Sales Goal =
CALCULATE(
AVERAGEX(values(D_Calendar[Date]), ([S1_Daily AVG Sales Qty])) -- 일평균 Sales Qty
,all(Sales[SalesDate]), all(D_Calendar[Date]) -- 어떤 필터든지 초기화시키는 ALL
,filter(D_Calendar, D_Calendar[Weeknum] = Weekday(max(Sales[SalesDate]))) -- KPI 표시 요일 (가장 최근 날짜의 요일)과 같은 요일만 가지고 와라
,filter(D_Calendar, D_Calendar[Date] < max(Sales[SalesDate])) -- KPI 표시 날짜 (가장 최근 날짜) 이전)
): 우리가 A, B, C라는 데이터들을 드릴 수 있는데, 여기서 가장 RO가 좋았던 아이템과 어떤 소비자들에게서 반응이 좋았는지 알고 싶어요. 그리고 정기적으로 위 사항들을 리포팅 받고 싶습니다. 더불어 앞으로 어떤 지역과 소비자들에게 어떤 아이템을 주력으로 팔면 좋을지 인사이트도 함께 제시해주세요~
: 우리는 아이오와주에 주류 도매 & 소매의 플랫폼 사업을 하고자 합니다.
먼저 해당 지역의 주류 판매 시장의 동향이 어떤지 알고 싶습니다.
그리고 우리가 어느 지역에 어떤 상품을 주력으로 팔아야 할 지 인사이트를 제공해 주세요.
Q: 저희는 어떤 자료를 받을 수 있나요?
A: 회사 내부 데이터가 아니다 보니 잘 만들어진 데이터를 수집하는데 한계가 있었습니다.
우선 해당 지역의 약 10년치 주류 판매 데이터 전체를 전달드릴 수 있을 것 같습니다.
범용적인 목적을 지닌 분산 클러스터 컴퓨팅 오픈소스 프레임워크
: 시스템의 전반적인 성능을 향상시키기 위해 계산 부하량을 여러 노드에서 분담하여 병렬 처리하도록 구성하는 방식
- 모든 Executor 가 병렬로 작업을 수행할 수 있도록 '파티션'이라고 불리는 청크 단위로 데이터를 분할 ... 파티션은 클러스터의 물리적 머신에 존재하는 Row 의 집합
- 테이블의 데이터 Row, Column 으로 단순하게 표현
- Dataframe의 Partition 은 실행 중에 데이터가 컴퓨터 클러스터에서 물리적으로 분산되는 방식
- Spark 의 병렬성은 파티션과 익스큐터의 갯수로 결정됨
Spark 의 핵심 데이터 구조는 불변성을 가짐
- 변경을 원할 때 변경 방법을 Spark 에게 알려주는 Transformation
- 논리적 실행 계획을 세우게 됨
- 즉, 실제 연산이 일어나는 것은 아님
Lazy Evaluation
: 특정 연산 명령이 내려진 즉시 데이터를 수정하지 않고, 원시 데이터에 적용할 트랜스포메이션만 실행
-> 액션 전까지 전체 데이터 흐름을 최적화하는 강점을 지님
Action
: 실제연산을 수행하기 위한 사용자 명령 (트랜스포메이션으로부터 결과를 계산하도록 지시)
(예시)
- 카운트
- 콘솔에서 데이터를 보는 액션
- 출력 데이터 소스에 저장하는 액션
스파크의 카탈리스트
- 트랜스포메이션을 적용할 때, 스파크 SQL은 논리 계획이 담긴 트리 그래프를 생성
- 해당 Optimizer에 의해 최적의 논리를 받아와 데이터를 반환해주기 때문에 성능이 더욱 좋음
카탈리스트의 장점 Spark에서 Partition은 부분으로 나눈다는 의미 데이터를 Shuffing하고 Partition하여 나뉜 데이터에서 연산을 처리하는 경우, 네트워크 연산(node들의 통신)이 일어나게 된다. 연산 속도는 인메모리 >>디스크 VO >> 네트워크 순으로 빠르다. 그런데 Shuffing은 네트워크 연산이기 때문에 앞에서 *데이터가 어느 정도 정리가 된 후에 해야 한다. *해당 부분을 카탈리스트가 계획적으로 자동 최적화
:PySpark는 Python 환경에서 Apache Spark를 사용할 수 있는 인터페이스
→ 즉, PySpark는 Spark용 API
Spark Core and RODs
| Spark SQL and DataFrames | Pandas API on Spark | Structured Streaming | Machine Learning |
|---|
Spark SQL and Data Frames
- 대용량 정형 데이터 처리를 위해 SQL 인터페이스를 지원하는 PySpark
- SQL 쿼리를 사용 가능
- 데이터 표현 형식은 Dataframe이며, 이는 RDBMS(관계형 데이터 베이스 관리 시스템)의 table과 유사한 2차원 구조
Pandas API on Spark
- Pandas API를 지원
- Pandas와 같은 문법을 사용할 수 있다는 장점
Structured Streaming
- Spark SQL 엔진에 구축된 스트림 처리 엔진
- 정적 데이터에 배치 계산을 하는 것과 같은 방식으로 스트리밍 계산을 표현할 수 있는 장점
Machine Learning
- Spark의 머신러닝 라이브러리
- 데이터 병렬 처리 방법론을 활용해 모델링이 가능
- classification, Regression, Clustering, Dimension Reduction, Optimization 등 다양한 활용처
!apt-get install openjdk-8-jdk-headless # jdk 설치
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz # spark file
!tar -xf spark-3.0.0-bin-hadoop3.2.tgz # 스파크 압축풀기
!pip install findspark # 스파크 찾기
!pip install kaggle --upgrade # 캐글 데이터를 다운받기 위해 kaggle library 설치import os # 운영체제와의 상호작용을 돕는 다양한 기능을 제공하는 모듈
import findspark
# 환경변수에 path 지정
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"
findspark.init() # spark의 경우 잘 찾지 못하는 경우가 있어 findsaprk를 이용from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("pyspark_test")
.master("local[*]") # local에서 사용하며, 모든 쓰레드를 사용하도록 지정
.getOrCreate()
)# 지금 만들어진 spark 객체의 설정을 알아볼까요?
spark.sparkContext.getConf().getAll()# 받아온 file을 colab에 올려줍시다.
from google.colab import files
files.upload()!mkdir -p ~/.kaggle/ # kaggle 폴더 생성
!cp kaggle.json ~/.kaggle/ # json 파일 복사
!chmod 600 ~/.kaggle/kaggle.json # file 접근 권한 할당
!kaggle datasets download -d wethanielaw/iowa-liquor-sales-20230401 # data download
!unzip iowa-liquor-sales-20230401.zip # 압축풀기-
상세 설명
사용된 명령어 알아보기
- mkdir: 디렉토리(폴더) 생성 명령어
- cp: 파일 복사/이동 명령어
- chmod: 기존 파일 또는 디렉토리에 대한 접근권한 변경
- 600: 나에게만 읽기,쓰기 권한
- unzip: zip 으로 압축된 파일을 푸는 명령어
import os
# 압축 해제된 파일 크기는?
def convert_size(size_bytes):
import math
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, size_name[i])
file_size = os.path.getsize('./Iowa_Liquor_Sales.csv')
print('File Size:', convert_size(file_size), 'bytes')Studied from 제로베이스 데이터스쿨