Datalake : s3, parquet - helloMinji/chatbot_spotify GitHub Wiki
- ํ์ํ ํจํค์ง ๋ถ๋ฌ์ค๊ธฐ
import boto3 # aws sdk
import pandas as pd # ํน์ jsonpath๋ง ๊ฐ์ ธ์ค๊ธฐ ์ํด
import jsonpath # pip3 install jsonpath --user
1. top-track API ํ์ฉ
1. ํ์ํ key ๋ฏธ๋ฆฌ ์ค์
- jsonpath ํจํค์ง๋ฅผ ํตํด ํด๋น path ์์์ key์ ๋ง๋ value๋ฅผ ๋ฐํํ๋ค.
- parquet ์๋ฌ๋ฅผ ํผํ๊ณ , ์ํ๋ ๊ฐ๋ง ๊ฐ์ ธ์จ๋ค.
- key: ๊ฐ๊ณ ์ค๊ณ ์ถ์ ๊ฐ
- value: api์์์ key, ์ฆ path (๋นจ๊ฐ์ ํ์๋ ๊ฒ๋ค, ์ด๋ฏธ์ง ์ฐธ๊ณ )
external_urls์ value๊ฐ ๋์ ๋๋ฆฌ, ๊ทธ ๋์ ๋๋ฆฌ์ key๊ฐ spotify์ฌ์ ์ด๋ฐ ํํ๋ก ๊ฐ์ ธ์จ๋ค.
top_track_keys = {
"id": "id",
"name": "name",
"popularity": "popularity",
"external_url": "external_urls.spotify"
}
2. api๋ก ๋ฐ์ดํฐ ๋ถ๋ฌ์ค๊ธฐ
top_tracks = []
for๋ฌธ์ ๋๋ฉด์ cursor์์ ๊ฐ์ ธ์จ(์ฝ๋์ 1. ๋ถ๋ถ) id๊ฐ ๋ถ์ url์ด ์ถ๊ฐ๋๋ค.
for (id, ) in cursor.fetchall():
URL = "https://api.spotify.com/v1/artists/{}/top-tracks".format(id)
params = {
'country': 'US'
}
r = requests.get(URL, params=params, headers=headers)
raw = json.loads(r.text) # api๋ก ๊ฐ์ ธ์จ ๋ฐ์ดํฐ ์ ์ฅ
3. ๋ฐ์ดํฐ ์ ๋ฐ์ดํธ
jsonpath๊ฐ ์ถ๊ฐ๋๋ฉด์ ์๊ธด for loop
for i in raw['tracks']:
- raw๋ผ๋ dictionary์ tracks๋ผ๋ key์ ํด๋นํ๋ value๊ฐ i๊ฐ์ผ๋ก ๋ค์ด๊ฐ๋ค.
(ํํฌ์ v ํ์๋ ๋ถ๋ถ ์์๋ถํฐ ๋๊น์ง๊ฐ value, ์ฆ i!) - i์ ํ์ : dictionary
id, name, popularity, external_url ๋ฐ์ดํฐ ์ ๋ฐ์ดํธ
top_track = {}
for k, v in top_track_keys.items():
top_track.update({k: jsonpath.jsonpath(i, v)}
- i๋ผ๋ dictionary์์ v๊ฐ key์ธ value๋ฅผ ๊ฐ์ ธ์๋ผ
- jsonpath์ ๊ฒฐ๊ณผ! ๊ทธ๋์ k-jsonpath์ ๊ฒฐ๊ณผ๊ฐ key-value๋ก top_track์ ์ ๋ฐ์ดํธ๋๋ค!
artist_id ๋ ์ด๋ฏธ ๊ฐ๊ณ ์๊ธฐ ๋๋ฌธ์ jsonpath ์ฌ์ฉ ์ ํจ
top_track.update({'artist_id': id})
top_tracks.append(top_track)
track_ids = [i['id'][0] for i in top_tracks]
4. DataFrame -> Parquet โญ
- s3์๋ json๋ณด๋ค parquet์ผ๋ก ์ ์ฅํ๋ ๊ฒ์ด ๋ ์ข๊ธฐ ๋๋ฌธ
top_tracks = pd.DataFrame(top_tracks)
top_tracks.to_parquet('top-tracks.parquet', engine='pyarrow', compression="snappy")
- ์์์ ํ์ํ ๋ถ๋ถ๋ง ๊ฐ์ ธ์ค๊ธฐ ๋๋ฌธ์ parquetํ ์๋ฌ ์์
- ๋ณดํต์ json์ผ๋ก ๋จผ์ ์ ์ฅํ๊ณ , ์๋ก์ด ๋ฐ์ดํฐ๊ฐ ๋ค์ด์ค๋ ๊ฒฝ์ฐ ์ด๋ฏธ ์ง์ ๋ ํํ๋ก parquetํ๋ฅผ ํ๊ณ , ๋ค๋ฅธ ๋ฒํท์ ๋ฃ๋๋ค. ์์ ์๋ฌ ์๊ธฐ ์ํด!
๐ ์ด ํ๋ก์ธ์ค๋ฅผ ๊ตฌ์ถํ๋ ๊ฒ์ด ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ ๐
5. s3์ ์ ์ฅ โญ
dt = datatime.utcnow().strftime("%Y-%m-%d")
- utcnow: unix time
๋ถ๋ฌ์จ ๋ฐ์ดํฐ๋ก ํํฐ์ ์์ฑ
s3 = boto3.resource('s3')
object = s3.Object('spotify-artists', 'top-tracks/dt={}/top-tracks.parquet'.format(dt))
- argument1: ๋ฒํท ์ด๋ฆ
- argument2: ํํฐ์
์์ฑ.
๋ ์ง๋ฅผ ๊ธฐ์ค์ผ๋ก ํํฐ์ ์ ๋ง๋ค์ด์, ๊ฐ์ฅ ์ต๊ทผ ๊ฒ์ ๊ฐ์ ธ์ค๊ฑฐ๋ ์ํ๋ ์๊ธฐ์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ฌ ์ ์๊ฒ ๋๋ค.
data = open('top-tracks.parquet','rb')
object.put(Body=data)
2. Audio features API ํ์ฉ (batch)
1. ๋ฐฐ์น ๋ง๋ค๊ธฐ
tracks_batch = [track_ids[i: i+100] for i in range(0, len(track_ids), 100)] # audio features๋ 100๊ฐ๊น์ง๋ง ๊ฐ๋ฅ
audio_features = []
2. batch ํ์ฉํ์ฌ ๋ฐ์ดํฐ ๋ถ๋ฌ์ค๊ธฐ
for i in tracks_batch:
ids = ','.join(i)
URL = "https://api.spotify.com/v1/audio-features/?ids={}".format(ids)
r = requests.get(URL, headers=headers)
raw = json.loads(r.text)
3. ๋ฐ์ดํฐ ์ ๋ฐ์ดํธ
audio_features.extend(raw['audio_features'])
4. DataFrame -> Parquet โญ
audio_features = pd.DataFrame(audio_features)
audio_features.to_parquet('audio-features.parquet', engine='pyarrow', compression='snappy')
5. s3์ ์ ์ฅ โญ
s3 = boto3.resource('s3')
object = s3.Object('spotify-artists', 'audio-features/dt={}/audio-features.parquet'.format(dt))
data = open('audio-features.parquet','rb')
object.put(Body=data)