DynamoDB : data set - helloMinji/chatbot_spotify GitHub Wiki
Put data
(Put할 데이터를 가져오기 위해 spotify API 사용 위한 코드 추가)
# spotify api 사용 위한 import 추가
import requests
import base64
import json
import logging
import pymysql
# spotify api 사용 위한 정보 추가
client_id = ""
client_secret = ""
host = ""
port =
username = ""
database = ""
password = ""
# mysql Connect
try:
conn = pymysql.connect(host=host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
cursor = conn.cursor()
except:
logging.error("could not connect to RDS")
sys.exit(1)
headers = get_headers(client_id, client_secret)
def get_headers(client_id, client_secret):
endpoint = "https://accounts.spotify.com/api/token"
encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')
headers = {
"Authorization": "Basic {}".format(encoded)
}
payload = {
"grant_type": "client_credentials"
}
r = requests.post(endpoint, data=payload, headers=headers)
access_token = json.loads(r.text)['access_token']
headers = {
"Authorization": "Bearer {}".format(access_token)
}
return headers
1. Table define: dynamodb에서 사용할 table 지정
table = dynamodb.Table('top_tracks')
# 데이터 하나만 가져오기
cursor.execute('SELECT id FROM artists LIMIT 1')
2. API로 데이터 불러오기
for (artist_id, ) in cursor.fetchall():
URL = "https://api.spotify.com/v1/artists/{}/top-tracks".format(artist_id) # endpoint
params = {
'country': 'US'
}
r = requests.get(URL, params=params, headers=headers)
raw = json.loads(r.text)
3. Put
for track in raw['tracks']:
key 생성 (artist_id: 위의 for문에서)
data = {
'artist_id': artist_id
}
key에 불러온 트랙정보를 붙인다: data가 key + track이 된다
data.update(track)
⭐Put (single item)
table.put_item(
Item=data
)
⭐Put (using batch)
with table.batch_writer() as batch:
batch.put_item(
Item=data
)
Get data
(querying, scanning 위한 import 추가)
from boto3.dynamodb.conditions import Key, Attr
Table define: dynamodb에서 사용할 table 지정
table = dynamodb.Table('top_tracks')
1. Get
dynamo db 생성 시 설정했던 키가 모두 있어야만(partition key, sort key) 에러 없이 불러와진다.
response = table.get_item(
Key=(
'artist_id': '특정값' # partition key
'id': '특정값' # sort key
)
)
2. Querying ⭐ (key를 모르는데 불러오고 싶다면)
primary key(partition key)만 알고 있을 때
response2 = table.query(
# key로 가져올 때
KeyConditionExpression=Key('artist_id').eq('특정값'),
# 조건으로 가져올 때
FilterExpression=Attr('popularity').gt(80)
)
print(response2['Items'])
3. Scanning ⭐ (key를 모르는데 불러오고 싶다면)
key는 하나도 모르고 다른 정보만 알고 있을 때
response3 = table.scan(
FilterExpression=Attr('popularity').gt(80)
)
print(response3['Items'])
❗ scanning은 key가 없기 때문에 시간이 굉장히 오래 걸리는 작업이므로, 반드시 필요할 때만 사용할 것.