KR_MongoDB - somaz94/python-study GitHub Wiki
MongoDB๋ ๋ฌธ์ ์งํฅ NoSQL ๋ฐ์ดํฐ๋ฒ ์ด์ค์ด๋ค.
from pymongo import MongoClient
# ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ
client = MongoClient('mongodb://localhost:27017/')
db = client['mydatabase']
# ์ปฌ๋ ์
์ ๊ทผ
users = db.users
posts = db.posts
# ํ๊ฒฝ ๋ณ์๋ฅผ ํตํ ์ฐ๊ฒฐ ๋ฌธ์์ด ๊ด๋ฆฌ
import os
from dotenv import load_dotenv
load_dotenv() # .env ํ์ผ์์ ํ๊ฒฝ ๋ณ์ ๋ก๋
connection_string = os.getenv('MONGODB_URI')
secure_client = MongoClient(connection_string)
โ
ํน์ง:
- ๋ฌธ์ ์งํฅ ๋ฐ์ดํฐ๋ฒ ์ด์ค
- NoSQL ๊ตฌ์กฐ
- ์ ์ฐํ ์คํค๋ง
- JSON ํ์์ BSON ๋ฌธ์
- ๋ถ์ฐ ์ํคํ ์ฒ ์ง์
- ์ํ์ ํ์ฅ์ฑ
MongoDB์์ ๋ฐ์ดํฐ๋ฅผ ์์ฑ, ์กฐํ, ์์ , ์ญ์ ํ๋ ๊ธฐ๋ณธ ์์
์ ์ํํ ์ ์๋ค.
# ๋ฌธ์ ์์ฑ (Create)
def create_user(user_data):
try:
result = users.insert_one(user_data)
return str(result.inserted_id)
except Exception as e:
print(f"Error: {e}")
return None
# ๋ฌธ์ ์กฐํ (Read)
def get_user(user_id):
from bson.objectid import ObjectId
return users.find_one({'_id': ObjectId(user_id)})
# ์ฌ๋ฌ ๋ฌธ์ ์กฐํ
def get_users(criteria=None, limit=10, skip=0):
if criteria is None:
criteria = {}
cursor = users.find(criteria).limit(limit).skip(skip)
return list(cursor)
# ๋ฌธ์ ์์ (Update)
def update_user(user_id, update_data):
from bson.objectid import ObjectId
result = users.update_one(
{'_id': ObjectId(user_id)},
{'$set': update_data}
)
return result.modified_count > 0
# ๋ฌธ์ ์ญ์ (Delete)
def delete_user(user_id):
from bson.objectid import ObjectId
result = users.delete_one({'_id': ObjectId(user_id)})
return result.deleted_count > 0
# ์ฌ์ฉ ์์
new_user = {
'name': 'ํ๊ธธ๋',
'email': '[email protected]',
'age': 30,
'interests': ['๋
์', '๋ฑ์ฐ', 'ํ๋ก๊ทธ๋๋ฐ']
}
user_id = create_user(new_user)
print(f"์์ฑ๋ ์ฌ์ฉ์ ID: {user_id}")
# ์ฌ์ฉ์ ์กฐํ
user = get_user(user_id)
print(f"์กฐํ๋ ์ฌ์ฉ์: {user}")
# ์ฌ์ฉ์ ์ ๋ณด ์์
update_user(user_id, {'age': 31, 'interests': ['๋
์', '๋ฑ์ฐ', 'ํ๋ก๊ทธ๋๋ฐ', '์ฌํ']})
# ์กฐ๊ฑด๋ถ ์ฟผ๋ฆฌ
young_users = get_users({'age': {'$lt': 35}})
โ
ํน์ง:
- CRUD ์ฐ์ฐ (Create, Read, Update, Delete)
- ๊ฐ์ฒด ID ๊ด๋ฆฌ
- ์ฟผ๋ฆฌ ์ฐ์ฐ์ ($lt, $gt, $in ๋ฑ)
- ํ๋ ์ฐ์ฐ์ ($set, $inc, $push ๋ฑ)
- ์ ๋ ฌ, ์ ํ, ๊ฑด๋๋ฐ๊ธฐ ๊ธฐ๋ฅ
- ์์ธ ์ฒ๋ฆฌ ๋ฐ ์๋ฌ ๊ด๋ฆฌ
๋ณต์กํ ๋ฐ์ดํฐ ์กฐํ ๋ฐ ๋ณํ์ ์ํ ๊ณ ๊ธ ์ฟผ๋ฆฌ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค.
# ๋ณต์กํ ์ฟผ๋ฆฌ
def find_active_users_with_posts():
return users.aggregate([
{
'$match': {
'status': 'active'
}
},
{
'$lookup': {
'from': 'posts',
'localField': '_id',
'foreignField': 'user_id',
'as': 'user_posts'
}
},
{
'$project': {
'name': 1,
'email': 1,
'post_count': {'$size': '$user_posts'}
}
}
])
# ์ธ๋ฑ์ค ์์ฑ
def create_indexes():
users.create_index([('email', 1)], unique=True)
posts.create_index([('title', 'text'), ('content', 'text')])
# ๊ทธ๋ฃนํ ์ฟผ๋ฆฌ
def group_users_by_age():
return list(users.aggregate([
{
'$group': {
'_id': {
'age_group': {
'$switch': {
'branches': [
{'case': {'$lt': ['$age', 20]}, 'then': '10๋'},
{'case': {'$lt': ['$age', 30]}, 'then': '20๋'},
{'case': {'$lt': ['$age', 40]}, 'then': '30๋'}
],
'default': '40๋ ์ด์'
}
}
},
'count': {'$sum': 1},
'avg_age': {'$avg': '$age'}
}
},
{
'$sort': {'count': -1}
}
]))
# ํ
์คํธ ๊ฒ์
def search_posts(query):
return list(posts.find(
{'$text': {'$search': query}},
{'score': {'$meta': 'textScore'}}
).sort([('score', {'$meta': 'textScore'})]))
# ์ง๋ฆฌ๊ณต๊ฐ ์ฟผ๋ฆฌ
def find_nearby_places(longitude, latitude, max_distance_km):
return list(db.places.find({
'location': {
'$near': {
'$geometry': {
'type': 'Point',
'coordinates': [longitude, latitude]
},
'$maxDistance': max_distance_km * 1000
}
}
}))
โ
ํน์ง:
- ์ง๊ณ ํ์ดํ๋ผ์ธ (aggregation)
- ์กฐ์ธ ์ฐ์ฐ ($lookup)
- ํ๋ ํฌ์ ($project)
- ๊ทธ๋ฃนํ ๋ฐ ์ง๊ณ ํจ์
- ํ ์คํธ ๊ฒ์ ๋ฐ ์ธ๋ฑ์ฑ
- ์ง๋ฆฌ๊ณต๊ฐ ์ฟผ๋ฆฌ
- ๋ฐ์ดํฐ ๋ณํ ๋ฐ ๊ณ์ฐ
MongoDB์ ์ ์ฅํ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ฒด ๋ชจ๋ธ๋ก ๊ด๋ฆฌํ๋ ๋ฐฉ๋ฒ์ด๋ค.
from datetime import datetime
from bson.objectid import ObjectId
class User:
def __init__(self, name, email):
self.name = name
self.email = email
self.created_at = datetime.utcnow()
def to_dict(self):
return {
'name': self.name,
'email': self.email,
'created_at': self.created_at
}
@classmethod
def from_dict(cls, data):
user = cls(data['name'], data['email'])
user.created_at = data.get('created_at', datetime.utcnow())
return user
# ์ฐธ์กฐ ๊ด๊ณ ๋ชจ๋ธ๋ง
class Post:
def __init__(self, title, content, author_id):
self.title = title
self.content = content
self.author_id = author_id # User์ _id๋ฅผ ์ฐธ์กฐ
self.created_at = datetime.utcnow()
self.updated_at = self.created_at
self.tags = []
def to_dict(self):
return {
'title': self.title,
'content': self.content,
'author_id': self.author_id,
'created_at': self.created_at,
'updated_at': self.updated_at,
'tags': self.tags
}
def add_tag(self, tag):
if tag not in self.tags:
self.tags.append(tag)
@classmethod
def from_dict(cls, data):
post = cls(
data['title'],
data['content'],
data['author_id']
)
post.created_at = data.get('created_at', datetime.utcnow())
post.updated_at = data.get('updated_at', datetime.utcnow())
post.tags = data.get('tags', [])
return post
# ๋ด์ฅ ๋ฌธ์ ๋ชจ๋ธ๋ง
class Product:
def __init__(self, name, price):
self.name = name
self.price = price
self.created_at = datetime.utcnow()
self.reviews = [] # ๋ด์ฅ ๋ฌธ์
def to_dict(self):
return {
'name': self.name,
'price': self.price,
'created_at': self.created_at,
'reviews': self.reviews
}
def add_review(self, user_id, rating, comment):
review = {
'user_id': user_id,
'rating': rating,
'comment': comment,
'created_at': datetime.utcnow()
}
self.reviews.append(review)
โ
ํน์ง:
- ๊ฐ์ฒด-๋ฌธ์ ๋งคํ
- ์ฐธ์กฐ ๊ด๊ณ vs ๋ด์ฅ ๋ฌธ์
- ์ง๋ ฌํ ๋ฐ ์ญ์ง๋ ฌํ
- ์๊ฐ ๋ฐ์ดํฐ ๊ด๋ฆฌ
- ์ปฌ๋ ์ ๊ฐ ๊ด๊ณ ์ค๊ณ
- ์คํค๋ง ์ ์ฐ์ฑ ํ์ฉ
- ๋ฐ์ดํฐ ๋ฌด๊ฒฐ์ฑ ์ฒ๋ฆฌ
๋๋์ ๋ฐ์ดํฐ๋ฅผ ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํ๊ธฐ ์ํ ๊ธฐ๋ฅ์ด๋ค.
def bulk_insert(documents):
try:
result = users.insert_many(documents)
return len(result.inserted_ids)
except Exception as e:
print(f"Bulk insert failed: {e}")
return 0
def bulk_update(filter_criteria, update_data):
try:
result = users.update_many(
filter_criteria,
{'$set': update_data}
)
return result.modified_count
except Exception as e:
print(f"Bulk update failed: {e}")
return 0
# ๋ฒํฌ ์์
์ ์ฉ API ์ฌ์ฉ
def perform_bulk_operations():
from pymongo import UpdateOne, InsertOne, DeleteOne
bulk_operations = [
InsertOne({'name': '๊น์ฒ ์', 'age': 25}),
InsertOne({'name': '์ด์ํฌ', 'age': 28}),
UpdateOne({'name': 'ํ๊ธธ๋'}, {'$set': {'age': 40}}),
DeleteOne({'name': '๋ฐ์ง์ฑ'})
]
try:
result = users.bulk_write(bulk_operations, ordered=False)
print(f"์ฝ์
: {result.inserted_count}, ์์ : {result.modified_count}, ์ญ์ : {result.deleted_count}")
return True
except Exception as e:
print(f"Bulk operation failed: {e}")
return False
# ๋์ฉ๋ ๋ฐ์ดํฐ ์ฒ๋ฆฌ
def process_large_dataset(file_path, batch_size=1000):
import csv
total_inserted = 0
batch = []
with open(file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for i, row in enumerate(reader):
# ํ๋ ๋ณํ ๋ฐ ์ ์
if 'age' in row:
row['age'] = int(row['age'])
batch.append(row)
# ๋ฐฐ์น ํฌ๊ธฐ์ ๋๋ฌํ๋ฉด ์ฝ์
if len(batch) >= batch_size:
inserted = bulk_insert(batch)
total_inserted += inserted
print(f"Inserted {inserted} documents. Total: {total_inserted}")
batch = []
# ๋จ์ ๋ฐฐ์น ์ฒ๋ฆฌ
if batch:
inserted = bulk_insert(batch)
total_inserted += inserted
print(f"Inserted {inserted} documents. Total: {total_inserted}")
return total_inserted
โ
ํน์ง:
- ๋๋ ๋ฐ์ดํฐ ์ฝ์
- ์ผ๊ด ์ ๋ฐ์ดํธ
- ๋ฒํฌ ์์ API
- ๋ฐฐ์น ์ฒ๋ฆฌ
- ๋์ฉ๋ ๋ฐ์ดํฐ์ ๊ด๋ฆฌ
- ํธ๋์ญ์ ๊ด๋ฆฌ
- ์ฑ๋ฅ ์ต์ ํ
์์์ ์ฐ์ฐ๊ณผ ์๋ฌ ๋ณต๊ตฌ๋ฅผ ์ํ ํธ๋์ญ์
๊ธฐ๋ฅ์ด๋ค.
def transfer_points(from_user_id, to_user_id, points):
from bson.objectid import ObjectId
from pymongo.errors import PyMongoError
# ์ธ์
์์
with client.start_session() as session:
try:
# ํธ๋์ญ์
์์
with session.start_transaction():
# ํฌ์ธํธ ์ฐจ๊ฐ
from_result = users.update_one(
{'_id': ObjectId(from_user_id)},
{'$inc': {'points': -points}},
session=session
)
if from_result.modified_count == 0:
raise ValueError(f"User {from_user_id} not found or not enough points")
# ํฌ์ธํธ ์ถ๊ฐ
to_result = users.update_one(
{'_id': ObjectId(to_user_id)},
{'$inc': {'points': points}},
session=session
)
if to_result.modified_count == 0:
raise ValueError(f"User {to_user_id} not found")
# ํธ๋์ญ์
๋ก๊ทธ ๊ธฐ๋ก
db.transactions.insert_one({
'from_user': ObjectId(from_user_id),
'to_user': ObjectId(to_user_id),
'points': points,
'timestamp': datetime.utcnow()
}, session=session)
return True
except (PyMongoError, ValueError) as e:
print(f"Transaction failed: {e}")
return False
# ์ฌ์๋ ๋ก์ง
def retry_operation(operation_func, max_retries=3, *args, **kwargs):
from pymongo.errors import ConnectionFailure, OperationFailure
retries = 0
while retries < max_retries:
try:
return operation_func(*args, **kwargs)
except (ConnectionFailure, OperationFailure) as e:
retries += 1
wait_time = 0.5 * (2 ** retries) # ์ง์ ๋ฐฑ์คํ
print(f"Operation failed: {e}. Retrying in {wait_time}s ({retries}/{max_retries})")
import time
time.sleep(wait_time)
# ๋ชจ๋ ์ฌ์๋ ์คํจ
raise Exception(f"Operation failed after {max_retries} retries")
โ
ํน์ง:
- ์ธ์ ๋ฐ ํธ๋์ญ์ ๊ด๋ฆฌ
- ACID ํน์ฑ ์ง์
- ์์์ ์์ ์ฒ๋ฆฌ
- ์๋ฌ ๋ณต๊ตฌ ๋ฐ ์ฌ์๋
- ๋ถ์ฐ ํธ๋์ญ์
- ๋กค๋ฐฑ ๋ฉ์ปค๋์ฆ
- ์ง์ ๋ฐฑ์คํ ์ ๋ต
MongoDB ์ฟผ๋ฆฌ ๋ฐ ๋ฐ์ดํฐ ๋ชจ๋ธ์ ์ฑ๋ฅ์ ํฅ์์ํค๋ ๊ธฐ๋ฒ์ด๋ค.
# ์ฟผ๋ฆฌ ํ๋กํ์ผ๋ง
def analyze_slow_queries():
# ํ๋กํ์ผ๋ง ๋ ๋ฒจ ์ค์ (0=๊บผ์ง, 1=๋๋ฆฐ ์ฟผ๋ฆฌ๋ง, 2=๋ชจ๋ ์ฟผ๋ฆฌ)
db.command({"profile": 1, "slowms": 100})
# ๋๋ฆฐ ์ฟผ๋ฆฌ ํ์ธ
slow_queries = list(db.system.profile.find().sort("millis", -1).limit(10))
for query in slow_queries:
print(f"Slow query: {query['op']} - {query['millis']}ms")
print(f"Query: {query['query']}")
print(f"Namespace: {query['ns']}")
print("-" * 50)
return slow_queries
# ๋ฐ์ดํฐ ๋ชจ๋ธ ์ต์ ํ (์: ์ธ๋ฑ์ค ์ถ์ฒ)
def suggest_indexes():
# ํ์ฌ ์ธ๋ฑ์ค ํ์ธ
current_indexes = list(users.list_indexes())
print(f"Current indexes: {len(current_indexes)}")
# ์ธ๋ฑ์ค ์ถ์ฒ ๋ก์ง
pipeline = [
{"$indexStats": {}},
{"$sort": {"accesses.ops": -1}}
]
index_stats = list(users.aggregate(pipeline))
# ์ถ์ฒ ์ธ๋ฑ์ค ์์ฑ
recommendations = []
for stat in index_stats:
if stat["accesses"]["ops"] > 1000:
# ์ธ๋ฑ์ค ๊ณํ ์ค์ ์์
recommendations.append({
"collection": stat["name"],
"field": stat["key"].keys(),
"ops": stat["accesses"]["ops"]
})
return recommendations
# ์ฟผ๋ฆฌ ์ต์ ํ - ์ปค๋ฒ๋ง ์ธ๋ฑ์ค ํ์ฉ
def optimized_users_query(min_age, max_age):
# 1. ์ธ๋ฑ์ค ์์ฑ
users.create_index([("age", 1), ("name", 1)])
# 2. ์ธ๋ฑ์ค๋ง ์ฌ์ฉํ๋ ์ฟผ๋ฆฌ (ํ๋ ํ๋ก์ ์
์ ์ฃผ์)
result = users.find(
{"age": {"$gte": min_age, "$lte": max_age}},
{"_id": 0, "name": 1, "age": 1} # ์ธ๋ฑ์ค์ ํฌํจ๋ ํ๋๋ง ์ฌ์ฉ
).hint([("age", 1), ("name", 1)]) # ๋ช
์์ ์ธ๋ฑ์ค ์ฌ์ฉ
return list(result)
# ์บ์ฑ ๋ ์ด์ด ๊ตฌํ
import functools
def cached_query(ttl_seconds=300):
"""์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ๋ฉ๋ชจ๋ฆฌ์ ์บ์ฑํ๋ ๋ฐ์ฝ๋ ์ดํฐ"""
cache = {}
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# ์บ์ ํค ์์ฑ
key = str(args) + str(kwargs)
# ์บ์ ํ์ธ
now = datetime.utcnow()
if key in cache and (now - cache[key]["timestamp"]).total_seconds() < ttl_seconds:
print("Cache hit!")
return cache[key]["result"]
# ์บ์ ๋ฏธ์ค - ํจ์ ์คํ ๋ฐ ๊ฒฐ๊ณผ ์บ์ฑ
result = func(*args, **kwargs)
cache[key] = {"result": result, "timestamp": now}
print("Cache miss - updated cache")
return result
return wrapper
return decorator
@cached_query(ttl_seconds=60)
def get_popular_posts(limit=10):
return list(posts.find().sort("views", -1).limit(limit))
โ
ํน์ง:
- ์ฟผ๋ฆฌ ํ๋กํ์ผ๋ง
- ์ธ๋ฑ์ค ์ต์ ํ
- ์ฟผ๋ฆฌ ๊ณํ ๋ถ์
- ์ปค๋ฒ๋ง ์ธ๋ฑ์ค ํ์ฉ
- ์บ์ฑ ์ ๋ต
- ์ฑ๋ฅ ๋ณ๋ชฉ ์๋ณ
- ๋ฐ์ดํฐ ๋ชจ๋ธ ์ต์ ํ
- ๋ฆฌ๋/๋ผ์ดํธ ๋ถํ ๊ด๋ฆฌ
โ
๋ชจ๋ฒ ์ฌ๋ก:
- ์ ์ ํ ์ธ๋ฑ์ค ์ค๊ณ ๋ฐ ๊ด๋ฆฌ
- ์ฟผ๋ฆฌ ํจํด์ ๋ถ์ํ์ฌ ์ธ๋ฑ์ค ์ค๊ณ
- ๋ณตํฉ ์ธ๋ฑ์ค์ ๋จ์ผ ์ธ๋ฑ์ค์ ํธ๋ ์ด๋์คํ ํ๊ฐ
- ๋ถํ์ํ ์ธ๋ฑ์ค ์ ๊ฑฐ๋ก ์ฐ๊ธฐ ์ฑ๋ฅ ํฅ์
- ์ค๋ฉ ์ ๋ต ์๋ฆฝ
- ๋ฐ์ดํฐ ๊ท๋ชจ๊ฐ ํด ๋ ์ค๋ ํค ์ ์คํ ์ ํ
- ๊ท ๋ฑํ ๋ฐ์ดํฐ ๋ถํฌ๋ฅผ ์ํ ์ค๊ณ
- ๋ณต์ ์ธํธ ๊ตฌ์ฑ ๋ฐ ๊ด๋ฆฌ
- ๊ณ ๊ฐ์ฉ์ฑ์ ์ํ ๋ค์ค ๋ ธ๋ ์ค์
- ์ฝ๊ธฐ ์ค์ผ์ผ๋ง์ ์ํ ๋ณด์กฐ ๋ ธ๋ ํ์ฉ
- ํธ๋์ญ์
์ ์ ํ ํ์ฉ
- ํ์ํ ๊ฒฝ์ฐ์๋ง ํธ๋์ญ์ ์ฌ์ฉ
- ํธ๋์ญ์ ์ ๋ฒ์๋ฅผ ์ต์ํ
- ๋ฐ์ดํฐ ๋ชจ๋ธ๋ง ์ต์ ํ
- ๋ด์ฅ ๋ฌธ์ vs ์ฐธ์กฐ ๊ด๊ณ ์ ์ ํ ์ ํ
- ์ก์ธ์ค ํจํด์ ๋ฐ๋ฅธ ์คํค๋ง ์ค๊ณ
- ๋ฐ์ดํฐ ์ ๊ทํ์ ๋น์ ๊ทํ ๊ท ํ ์ ์ง
- ์ฟผ๋ฆฌ ์ฑ๋ฅ ๋ชจ๋ํฐ๋ง
- ๋๋ฆฐ ์ฟผ๋ฆฌ ํ๋กํ์ผ๋ง ํ์ฑํ
- ์ฟผ๋ฆฌ ๊ณํ ๋ถ์์ผ๋ก ์ฑ๋ฅ ๋ณ๋ชฉ ์๋ณ
- ๋ฐฑ์
์ ๋ต ๊ตฌํ
- ์ ๊ธฐ์ ์ธ ๋ฐฑ์ ์ค์ผ์ค ์๋ฆฝ
- ๋ณต๊ตฌ ์ ์ฐจ ๋ฌธ์ํ ๋ฐ ํ ์คํธ
- ๋ณด์ ์ค์ ์ต์ ํ
- ์ธ์ฆ ๋ฐ ๊ถํ ๋ชจ๋ธ ๊ตฌํ
- ๋คํธ์ํฌ ๋ณด์ ๊ฐํ
- ๋ฏผ๊ฐ ๋ฐ์ดํฐ ์ํธํ ๊ณ ๋ ค
- ๋๊ตฌ ํ์ฉ
- MongoDB Compass๋ฅผ ํตํ ์๊ฐ์ ๊ด๋ฆฌ
- Aggregation Builder๋ก ๋ณต์กํ ์ฟผ๋ฆฌ ์์ฑ
- MongoDB Atlas ๋ชจ๋ํฐ๋ง ๋๊ตฌ ํ์ฉ