KR_MySQL - somaz94/python-study GitHub Wiki

Python MySQL ๊ฐœ๋… ์ •๋ฆฌ


1๏ธโƒฃ MySQL ๊ธฐ์ดˆ

MySQL์€ ๊ฐ€์žฅ ๋„๋ฆฌ ์‚ฌ์šฉ๋˜๋Š” ์˜คํ”ˆ์†Œ์Šค ๊ด€๊ณ„ํ˜• ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์ด๋‹ค.

import mysql.connector

# ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ
config = {
    'user': 'username',
    'password': 'password',
    'host': 'localhost',
    'database': 'mydatabase',
    'raise_on_warnings': True,
    'charset': 'utf8mb4',
    'collation': 'utf8mb4_unicode_ci',
    'autocommit': False
}

conn = mysql.connector.connect(**config)
cursor = conn.cursor(dictionary=True)  # ๊ฒฐ๊ณผ๋ฅผ ๋”•์…”๋„ˆ๋ฆฌ ํ˜•ํƒœ๋กœ ๋ฐ˜ํ™˜

# ํ™˜๊ฒฝ ๋ณ€์ˆ˜๋ฅผ ํ†ตํ•œ ์—ฐ๊ฒฐ ๋ฌธ์ž์—ด ๊ด€๋ฆฌ
import os
from dotenv import load_dotenv

load_dotenv()  # .env ํŒŒ์ผ์—์„œ ํ™˜๊ฒฝ ๋ณ€์ˆ˜ ๋กœ๋“œ
db_config = {
    'user': os.getenv('MYSQL_USER'),
    'password': os.getenv('MYSQL_PASSWORD'),
    'host': os.getenv('MYSQL_HOST'),
    'database': os.getenv('MYSQL_DATABASE')
}

secure_conn = mysql.connector.connect(**db_config)

# ์—ฐ๊ฒฐ ํ™•์ธ ๋ฐ ์ข…๋ฃŒ
def check_connection():
    try:
        cursor.execute("SELECT VERSION()")
        version = cursor.fetchone()
        print(f"Database version: {version['VERSION()']}")
        return True
    except mysql.connector.Error as err:
        print(f"์—๋Ÿฌ: {err}")
        return False
    finally:
        if conn.is_connected():
            print("์—ฐ๊ฒฐ ์„ฑ๊ณต!")

# ์—ฐ๊ฒฐ ์ข…๋ฃŒ
def close_connection():
    if conn.is_connected():
        cursor.close()
        conn.close()
        print("MySQL ์—ฐ๊ฒฐ์ด ๋‹ซํ˜”์Šต๋‹ˆ๋‹ค.")

โœ… ํŠน์ง•:

  • ๊ด€๊ณ„ํ˜• ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๊ด€๋ฆฌ ์‹œ์Šคํ…œ(RDBMS)
  • SQL ๊ธฐ๋ฐ˜ ์ฟผ๋ฆฌ ์–ธ์–ด
  • ๋‹ค์–‘ํ•œ ๋ฐ์ดํ„ฐ ํƒ€์ž… ์ง€์›
  • ํŠธ๋žœ์žญ์…˜ ๋ฐ ACID ์†์„ฑ ๋ณด์žฅ
  • ์ธ๋ฑ์‹ฑ ๋ฐ ์ตœ์ ํ™” ๊ธฐ๋Šฅ
  • ๋‹ค์ค‘ ์‚ฌ์šฉ์ž ์ ‘๊ทผ ์ง€์›
  • ๋ณด์•ˆ ๋ฐ ๊ถŒํ•œ ๊ด€๋ฆฌ
  • ํ™•์žฅ์„ฑ ๋ฐ ๋ณต์ œ ๊ธฐ๋Šฅ


2๏ธโƒฃ ๊ธฐ๋ณธ CRUD ์ž‘์—…

๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ ๊ธฐ๋ณธ ์ž‘์—…์ธ ์ƒ์„ฑ(Create), ์กฐํšŒ(Read), ์ˆ˜์ •(Update), ์‚ญ์ œ(Delete) ๊ธฐ๋Šฅ์„ ๊ตฌํ˜„ํ•œ๋‹ค.

# ํ…Œ์ด๋ธ” ์ƒ์„ฑ
def create_tables():
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INT AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(100) NOT NULL,
            email VARCHAR(100) UNIQUE,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
            status ENUM('active', 'inactive', 'suspended') DEFAULT 'active',
            INDEX idx_email (email),
            INDEX idx_status (status)
        )
    ''')
    
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS posts (
            id INT AUTO_INCREMENT PRIMARY KEY,
            user_id INT NOT NULL,
            title VARCHAR(200) NOT NULL,
            content TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
            FULLTEXT INDEX idx_content (title, content)
        )
    ''')
    conn.commit()

# ๋ฐ์ดํ„ฐ ์‚ฝ์ž… (Create)
def insert_user(name, email, status='active'):
    try:
        cursor.execute('''
            INSERT INTO users (name, email, status)
            VALUES (%s, %s, %s)
        ''', (name, email, status))
        conn.commit()
        return cursor.lastrowid
    except mysql.connector.Error as err:
        print(f"์—๋Ÿฌ ๋ฐœ์ƒ: {err}")
        conn.rollback()
        return None

# ๋‹ค์ˆ˜ ๋ ˆ์ฝ”๋“œ ์‚ฝ์ž…
def insert_many_users(users_data):
    try:
        cursor.executemany('''
            INSERT INTO users (name, email, status)
            VALUES (%s, %s, %s)
        ''', users_data)
        conn.commit()
        return cursor.rowcount
    except mysql.connector.Error as err:
        print(f"์—๋Ÿฌ ๋ฐœ์ƒ: {err}")
        conn.rollback()
        return 0

# ๋ฐ์ดํ„ฐ ์กฐํšŒ (Read)
def get_user(user_id):
    cursor.execute('''
        SELECT * FROM users WHERE id = %s
    ''', (user_id,))
    return cursor.fetchone()

# ๋‹ค์ˆ˜ ๋ ˆ์ฝ”๋“œ ์กฐํšŒ
def get_users(limit=10, offset=0, status=None):
    query = "SELECT * FROM users"
    params = []
    
    if status:
        query += " WHERE status = %s"
        params.append(status)
    
    query += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
    params.extend([limit, offset])
    
    cursor.execute(query, params)
    return cursor.fetchall()

# ๋ฐ์ดํ„ฐ ์กฐํšŒ - JOIN ์‚ฌ์šฉ
def get_user_with_posts(user_id):
    cursor.execute('''
        SELECT u.*, p.id as post_id, p.title, p.content, p.created_at as post_created_at
        FROM users u
        LEFT JOIN posts p ON u.id = p.user_id
        WHERE u.id = %s
    ''', (user_id,))
    
    results = cursor.fetchall()
    if not results:
        return None
    
    user = {
        'id': results[0]['id'],
        'name': results[0]['name'],
        'email': results[0]['email'],
        'created_at': results[0]['created_at'],
        'status': results[0]['status'],
        'posts': []
    }
    
    for row in results:
        if row['post_id']:
            user['posts'].append({
                'id': row['post_id'],
                'title': row['title'],
                'content': row['content'],
                'created_at': row['post_created_at']
            })
    
    return user

# ๋ฐ์ดํ„ฐ ์ˆ˜์ • (Update)
def update_user(user_id, name=None, email=None, status=None):
    update_fields = []
    params = []
    
    if name:
        update_fields.append("name = %s")
        params.append(name)
    if email:
        update_fields.append("email = %s")
        params.append(email)
    if status:
        update_fields.append("status = %s")
        params.append(status)
    
    if not update_fields:
        return False
    
    query = f"UPDATE users SET {', '.join(update_fields)} WHERE id = %s"
    params.append(user_id)
    
    try:
        cursor.execute(query, params)
        conn.commit()
        return cursor.rowcount > 0
    except mysql.connector.Error as err:
        print(f"์—๋Ÿฌ ๋ฐœ์ƒ: {err}")
        conn.rollback()
        return False

# ๋ฐ์ดํ„ฐ ์‚ญ์ œ (Delete)
def delete_user(user_id):
    try:
        cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
        conn.commit()
        return cursor.rowcount > 0
    except mysql.connector.Error as err:
        print(f"์—๋Ÿฌ ๋ฐœ์ƒ: {err}")
        conn.rollback()
        return False

# ์กฐ๊ฑด๋ถ€ ์‚ญ์ œ
def delete_inactive_users(days=90):
    try:
        cursor.execute('''
            DELETE FROM users 
            WHERE status = 'inactive' 
            AND updated_at < DATE_SUB(NOW(), INTERVAL %s DAY)
        ''', (days,))
        conn.commit()
        return cursor.rowcount
    except mysql.connector.Error as err:
        print(f"์—๋Ÿฌ ๋ฐœ์ƒ: {err}")
        conn.rollback()
        return 0

# ์‚ฌ์šฉ ์˜ˆ์‹œ
def crud_example():
    # ํ…Œ์ด๋ธ” ์ƒ์„ฑ
    create_tables()
    
    # ์‚ฌ์šฉ์ž ์ถ”๊ฐ€
    user_id = insert_user('ํ™๊ธธ๋™', '[email protected]')
    print(f"์ถ”๊ฐ€๋œ ์‚ฌ์šฉ์ž ID: {user_id}")
    
    # ์—ฌ๋Ÿฌ ์‚ฌ์šฉ์ž ์ถ”๊ฐ€
    users = [
        ('๊น€์ฒ ์ˆ˜', '[email protected]', 'active'),
        ('์ด์˜ํฌ', '[email protected]', 'active'),
        ('๋ฐ•์ง€์„ฑ', '[email protected]', 'inactive')
    ]
    inserted = insert_many_users(users)
    print(f"{inserted}๋ช…์˜ ์‚ฌ์šฉ์ž๊ฐ€ ์ถ”๊ฐ€๋˜์—ˆ์Šต๋‹ˆ๋‹ค.")
    
    # ์‚ฌ์šฉ์ž ์กฐํšŒ
    user = get_user(user_id)
    print(f"์กฐํšŒ๋œ ์‚ฌ์šฉ์ž: {user['name']} ({user['email']})")
    
    # ์‚ฌ์šฉ์ž ์ •๋ณด ์ˆ˜์ •
    update_user(user_id, name='ํ™๊ธธ์ˆœ', status='active')
    
    # ์‚ฌ์šฉ์ž ์‚ญ์ œ
    deleted = delete_inactive_users(30)
    print(f"{deleted}๋ช…์˜ ๋น„ํ™œ์„ฑ ์‚ฌ์šฉ์ž๊ฐ€ ์‚ญ์ œ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.")

โœ… ํŠน์ง•:

  • CRUD ์—ฐ์‚ฐ์˜ ์™„์ „ํ•œ ๊ตฌํ˜„
  • ํŒŒ๋ผ๋ฏธํ„ฐํ™”๋œ ์ฟผ๋ฆฌ๋กœ SQL ์ธ์ ์…˜ ๋ฐฉ์ง€
  • ๋‹ค์ค‘ ๋ ˆ์ฝ”๋“œ ์ฒ˜๋ฆฌ ๊ธฐ๋Šฅ
  • ์กฐ์ธ์„ ํ†ตํ•œ ๊ด€๊ณ„ํ˜• ๋ฐ์ดํ„ฐ ์กฐํšŒ
  • ๋™์  ์ฟผ๋ฆฌ ์ƒ์„ฑ
  • ํŠธ๋žœ์žญ์…˜ ๊ด€๋ฆฌ ๋ฐ ๋กค๋ฐฑ
  • ์—๋Ÿฌ ์ฒ˜๋ฆฌ ๋ฐ ์˜ˆ์™ธ ๊ด€๋ฆฌ
  • ์ธ๋ฑ์Šค ์„ค์ • ๋ฐ ํ™œ์šฉ


3๏ธโƒฃ ๊ณ ๊ธ‰ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ž‘์—…

๋ณต์žกํ•œ ํŠธ๋žœ์žญ์…˜ ์ฒ˜๋ฆฌ, ์Šคํ† ์–ด๋“œ ํ”„๋กœ์‹œ์ €, ๊ณ ๊ธ‰ ์ฟผ๋ฆฌ ๊ธฐ๋ฒ• ๋“ฑ์„ ํ™œ์šฉํ•œ๋‹ค.

# ํŠธ๋žœ์žญ์…˜ ์ฒ˜๋ฆฌ
def transfer_money(from_account, to_account, amount):
    try:
        cursor.execute('START TRANSACTION')
        
        # ์ถœ๊ธˆ ๊ณ„์ขŒ ์กฐํšŒ
        cursor.execute('''
            SELECT balance FROM accounts WHERE id = %s FOR UPDATE
        ''', (from_account,))
        from_balance = cursor.fetchone()
        
        if not from_balance or from_balance['balance'] < amount:
            raise Exception("์ž”์•ก ๋ถ€์กฑ ๋˜๋Š” ๊ณ„์ขŒ ์—†์Œ")
        
        # ์ถœ๊ธˆ
        cursor.execute('''
            UPDATE accounts 
            SET balance = balance - %s, 
                updated_at = NOW(),
                transaction_count = transaction_count + 1
            WHERE id = %s
        ''', (amount, from_account))
        
        # ์ž…๊ธˆ ๊ณ„์ขŒ ํ™•์ธ
        cursor.execute('''
            SELECT id FROM accounts WHERE id = %s FOR UPDATE
        ''', (to_account,))
        if not cursor.fetchone():
            raise Exception("๋Œ€์ƒ ๊ณ„์ขŒ ์—†์Œ")
        
        # ์ž…๊ธˆ
        cursor.execute('''
            UPDATE accounts 
            SET balance = balance + %s,
                updated_at = NOW(),
                transaction_count = transaction_count + 1
            WHERE id = %s
        ''', (amount, to_account))
        
        # ๊ฑฐ๋ž˜ ๋‚ด์—ญ ๊ธฐ๋ก
        cursor.execute('''
            INSERT INTO transactions
            (from_account_id, to_account_id, amount, transaction_type, description)
            VALUES (%s, %s, %s, 'transfer', '๊ณ„์ขŒ ์ด์ฒด')
        ''', (from_account, to_account, amount))
        
        cursor.execute('COMMIT')
        return True
    except Exception as e:
        cursor.execute('ROLLBACK')
        print(f"๊ฑฐ๋ž˜ ์‹คํŒจ: {e}")
        return False

# ์Šคํ† ์–ด๋“œ ํ”„๋กœ์‹œ์ € ์ƒ์„ฑ ๋ฐ ํ˜ธ์ถœ
def create_stored_procedures():
    # ์‚ฌ์šฉ์ž ์ƒํƒœ ์—…๋ฐ์ดํŠธ ํ”„๋กœ์‹œ์ €
    cursor.execute('''
        DROP PROCEDURE IF EXISTS update_user_status
    ''')
    
    cursor.execute('''
        CREATE PROCEDURE update_user_status(
            IN user_id INT,
            IN new_status VARCHAR(20)
        )
        BEGIN
            UPDATE users
            SET status = new_status,
                updated_at = NOW()
            WHERE id = user_id;
            
            SELECT ROW_COUNT() AS affected_rows;
        END
    ''')
    
    # ๋น„ํ™œ์„ฑ ์‚ฌ์šฉ์ž ์ •๋ฆฌ ํ”„๋กœ์‹œ์ €
    cursor.execute('''
        DROP PROCEDURE IF EXISTS cleanup_inactive_users
    ''')
    
    cursor.execute('''
        CREATE PROCEDURE cleanup_inactive_users(
            IN days_threshold INT
        )
        BEGIN
            DECLARE affected INT;
            
            START TRANSACTION;
            
            DELETE FROM users
            WHERE status = 'inactive'
            AND updated_at < DATE_SUB(NOW(), INTERVAL days_threshold DAY);
            
            SET affected = ROW_COUNT();
            
            COMMIT;
            
            SELECT affected AS deleted_users;
        END
    ''')
    
    conn.commit()

# ์Šคํ† ์–ด๋“œ ํ”„๋กœ์‹œ์ € ํ˜ธ์ถœ
def call_stored_procedure(user_id, new_status):
    try:
        cursor.callproc('update_user_status', (user_id, new_status))
        
        # ๊ฒฐ๊ณผ ๊ฐ€์ ธ์˜ค๊ธฐ
        for result in cursor.stored_results():
            return result.fetchone()['affected_rows'] > 0
    except mysql.connector.Error as err:
        print(f"ํ”„๋กœ์‹œ์ € ํ˜ธ์ถœ ์—๋Ÿฌ: {err}")
        return False

# ๊ณ ๊ธ‰ ์ฟผ๋ฆฌ - ์ง‘๊ณ„ ๋ฐ ๊ทธ๋ฃนํ™”
def user_statistics():
    cursor.execute('''
        SELECT 
            status,
            COUNT(*) as user_count,
            MIN(created_at) as oldest_user,
            MAX(created_at) as newest_user,
            AVG(TIMESTAMPDIFF(DAY, created_at, NOW())) as avg_account_age_days
        FROM users
        GROUP BY status
        ORDER BY user_count DESC
    ''')
    
    return cursor.fetchall()

# ๊ณ ๊ธ‰ ์ฟผ๋ฆฌ - ์œˆ๋„์šฐ ํ•จ์ˆ˜ ์‚ฌ์šฉ (MySQL 8.0+)
def post_analytics():
    cursor.execute('''
        SELECT 
            p.*,
            COUNT(*) OVER(PARTITION BY DATE(p.created_at)) as posts_same_day,
            RANK() OVER(ORDER BY p.view_count DESC) as popularity_rank
        FROM posts p
        JOIN users u ON p.user_id = u.id
        WHERE u.status = 'active'
        ORDER BY p.created_at DESC
        LIMIT 100
    ''')
    
    return cursor.fetchall()

# ์ „์ฒดํ…์ŠคํŠธ ๊ฒ€์ƒ‰
def search_posts(query):
    cursor.execute('''
        SELECT *
        FROM posts
        WHERE MATCH(title, content) AGAINST(%s IN NATURAL LANGUAGE MODE)
        ORDER BY MATCH(title, content) AGAINST(%s IN NATURAL LANGUAGE MODE) DESC
        LIMIT 20
    ''', (query, query))
    
    return cursor.fetchall()

โœ… ํŠน์ง•:

  • ๋ณต์žกํ•œ ํŠธ๋žœ์žญ์…˜ ์ฒ˜๋ฆฌ
  • ๋™์‹œ์„ฑ ์ œ์–ด(FOR UPDATE)
  • ์Šคํ† ์–ด๋“œ ํ”„๋กœ์‹œ์ € ํ™œ์šฉ
  • ์ง‘๊ณ„ ํ•จ์ˆ˜ ๋ฐ ๊ทธ๋ฃนํ™”
  • ๊ณ ๊ธ‰ ๋ถ„์„ ์ฟผ๋ฆฌ
  • ์œˆ๋„์šฐ ํ•จ์ˆ˜ (MySQL 8.0+)
  • ์ „์ฒดํ…์ŠคํŠธ ๊ฒ€์ƒ‰ ๊ธฐ๋Šฅ
  • ์ตœ์ ํ™”๋œ ์ฟผ๋ฆฌ ์ž‘์„ฑ


4๏ธโƒฃ ์ปค๋„ฅ์…˜ ํ’€๋ง

๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ์„ ํšจ์œจ์ ์œผ๋กœ ๊ด€๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ ์ปค๋„ฅ์…˜ ํ’€๋ง ๊ธฐ๋ฒ•์ด๋‹ค.

from mysql.connector import pooling

class DatabasePool:
    _pool = None
    
    @classmethod
    def init_pool(cls, pool_name, pool_size=5, **kwargs):
        dbconfig = {
            "pool_name": pool_name,
            "pool_size": pool_size,
            "pool_reset_session": True,  # ์—ฐ๊ฒฐ ๋ฐ˜ํ™˜ ์‹œ ์„ธ์…˜ ์ดˆ๊ธฐํ™”
            **kwargs
        }
        cls._pool = mysql.connector.pooling.MySQLConnectionPool(**dbconfig)
        print(f"{pool_name} ํ’€์ด {pool_size}๊ฐœ์˜ ์—ฐ๊ฒฐ๋กœ ์ดˆ๊ธฐํ™”๋˜์—ˆ์Šต๋‹ˆ๋‹ค.")
    
    @classmethod
    def get_connection(cls):
        if cls._pool is None:
            raise Exception("ํ’€์ด ์ดˆ๊ธฐํ™”๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.")
        return cls._pool.get_connection()
    
    @classmethod
    def execute_query(cls, query, params=None, commit=False, dictionary=True):
        """ํ’€์—์„œ ์—ฐ๊ฒฐ์„ ๊ฐ€์ ธ์™€ ์ฟผ๋ฆฌ๋ฅผ ์‹คํ–‰ํ•˜๊ณ  ๊ฒฐ๊ณผ๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค"""
        conn = None
        cursor = None
        try:
            conn = cls.get_connection()
            cursor = conn.cursor(dictionary=dictionary)
            cursor.execute(query, params or ())
            
            result = None
            if query.strip().upper().startswith("SELECT"):
                result = cursor.fetchall()
            else:
                if commit:
                    conn.commit()
                result = {"affected_rows": cursor.rowcount, "last_id": cursor.lastrowid}
            
            return result
        except Exception as e:
            if conn and commit:
                conn.rollback()
            raise e
        finally:
            if cursor:
                cursor.close()
            if conn:
                conn.close()  # ํ’€์— ๋ฐ˜ํ™˜

# ์ปค๋„ฅ์…˜ ํ’€ ์‚ฌ์šฉ ์˜ˆ์ œ
def connection_pool_example():
    # ํ’€ ์ดˆ๊ธฐํ™”
    DatabasePool.init_pool(
        "mypool", 
        pool_size=10, 
        user=os.getenv('MYSQL_USER'), 
        password=os.getenv('MYSQL_PASSWORD'),
        host=os.getenv('MYSQL_HOST'),
        database=os.getenv('MYSQL_DATABASE')
    )
    
    # ๋™์‹œ ์ฟผ๋ฆฌ ์‹œ๋ฎฌ๋ ˆ์ด์…˜
    def worker(user_id):
        try:
            result = DatabasePool.execute_query(
                "SELECT * FROM users WHERE id = %s", (user_id,)
            )
            return result[0] if result else None
        except Exception as e:
            print(f"์ฟผ๋ฆฌ ์‹คํ–‰ ์ค‘ ์˜ค๋ฅ˜: {e}")
            return None
    
    # ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ์˜ˆ์‹œ
    import concurrent.futures
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        user_ids = [1, 2, 3, 4, 5]
        results = list(executor.map(worker, user_ids))
        
        for user_id, result in zip(user_ids, results):
            if result:
                print(f"์‚ฌ์šฉ์ž {user_id}: {result['name']}")
            else:
                print(f"์‚ฌ์šฉ์ž {user_id}๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.")

# ๋Œ€๊ทœ๋ชจ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ์œ„ํ•œ ํ–ฅ์ƒ๋œ ํ’€๋ง
import queue
import threading
import time

class AdvancedConnectionPool:
    def __init__(self, config, min_connections=5, max_connections=20, timeout=30):
        self.config = config
        self.min_conn = min_connections
        self.max_conn = max_connections
        self.timeout = timeout
        self.connections = queue.Queue(maxsize=max_connections)
        self.active_connections = 0
        self.lock = threading.RLock()
        self.initialize()
    
    def initialize(self):
        for _ in range(self.min_conn):
            self._add_connection()
    
    def _add_connection(self):
        with self.lock:
            if self.active_connections >= self.max_conn:
                return False
            
            try:
                connection = mysql.connector.connect(**self.config)
                self.connections.put(connection)
                self.active_connections += 1
                return True
            except mysql.connector.Error as err:
                print(f"์—ฐ๊ฒฐ ์ƒ์„ฑ ์˜ค๋ฅ˜: {err}")
                return False
    
    def get_connection(self, timeout=None):
        timeout = timeout or self.timeout
        try:
            # ๊ฐ€๋Šฅํ•œ ์—ฐ๊ฒฐ ํ™•์ธ
            connection = self.connections.get(block=True, timeout=timeout)
            
            # ์—ฐ๊ฒฐ ์œ ํšจ์„ฑ ๊ฒ€์‚ฌ
            if not connection.is_connected():
                connection.reconnect()
            
            return connection
        except queue.Empty:
            # ํ’€์— ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์—ฐ๊ฒฐ์ด ์—†์œผ๋ฉด ์ƒˆ๋กœ ์ƒ์„ฑ ์‹œ๋„
            with self.lock:
                if self._add_connection():
                    return self.connections.get(block=False)
            
            raise Exception("์—ฐ๊ฒฐ ํƒ€์ž„์•„์›ƒ: ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์—ฐ๊ฒฐ ์—†์Œ")
    
    def release_connection(self, connection):
        if connection.is_connected():
            # ์—ด๋ฆฐ ํŠธ๋žœ์žญ์…˜์ด ์žˆ์œผ๋ฉด ๋กค๋ฐฑ
            connection.rollback()
            self.connections.put(connection)
        else:
            # ๋Š์–ด์ง„ ์—ฐ๊ฒฐ์€ ์ƒˆ๋กœ์šด ์—ฐ๊ฒฐ๋กœ ๋Œ€์ฒด
            with self.lock:
                self.active_connections -= 1
                self._add_connection()
    
    def close_all(self):
        with self.lock:
            while not self.connections.empty():
                try:
                    conn = self.connections.get(block=False)
                    if conn.is_connected():
                        conn.close()
                except queue.Empty:
                    break
            
            self.active_connections = 0

โœ… ํŠน์ง•:

  • ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ ์žฌ์‚ฌ์šฉ
  • ์—ฐ๊ฒฐ ์ƒ์„ฑ ๋น„์šฉ ์ ˆ๊ฐ
  • ์—ฐ๊ฒฐ ์ˆ˜ ์ œํ•œ์œผ๋กœ ๋ฆฌ์†Œ์Šค ๊ด€๋ฆฌ
  • ๋™์‹œ์„ฑ ์ฒ˜๋ฆฌ ์ตœ์ ํ™”
  • ์—ฐ๊ฒฐ ์œ ํšจ์„ฑ ๊ฒ€์‚ฌ ๋ฐ ๋ณต๊ตฌ
  • ํƒ€์ž„์•„์›ƒ ์„ค์ •
  • ํŠธ๋žœ์žญ์…˜ ์ƒํƒœ ๊ด€๋ฆฌ
  • ์Šค๋ ˆ๋“œ ์•ˆ์ „ ๊ตฌํ˜„


5๏ธโƒฃ ๋ฐ์ดํ„ฐ ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜

๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์Šคํ‚ค๋งˆ ๋ณ€๊ฒฝ์„ ๊ด€๋ฆฌํ•˜๋Š” ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ์‹œ์Šคํ…œ์ด๋‹ค.

class DatabaseMigration:
    def __init__(self, connection):
        self.conn = connection
        self.cursor = connection.cursor(dictionary=True)
    
    def create_migrations_table(self):
        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS migrations (
                id INT AUTO_INCREMENT PRIMARY KEY,
                migration_name VARCHAR(255) NOT NULL UNIQUE,
                batch INT NOT NULL,
                executed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        self.conn.commit()
    
    def get_executed_migrations(self):
        self.cursor.execute("SELECT migration_name FROM migrations")
        return [row['migration_name'] for row in self.cursor.fetchall()]
    
    def record_migration(self, migration_name, batch):
        self.cursor.execute(
            "INSERT INTO migrations (migration_name, batch) VALUES (%s, %s)",
            (migration_name, batch)
        )
        self.conn.commit()
    
    def migrate(self, migrations_directory):
        self.create_migrations_table()
        executed = self.get_executed_migrations()
        
        # ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ํŒŒ์ผ ์ฐพ๊ธฐ
        import os
        import importlib.util
        import re
        
        migration_files = [f for f in os.listdir(migrations_directory) 
                          if f.endswith('.py') and re.match(r'\d+_\w+\.py', f)]
        migration_files.sort()  # ๋‚ ์งœ์ˆœ/์ด๋ฆ„์ˆœ ์ •๋ ฌ
        
        # ๊ฐ€์žฅ ์ตœ๊ทผ ๋ฑƒ์น˜ ๋ฒˆํ˜ธ ์ฐพ๊ธฐ
        self.cursor.execute("SELECT MAX(batch) as max_batch FROM migrations")
        result = self.cursor.fetchone()
        current_batch = (result['max_batch'] or 0) + 1
        
        executed_count = 0
        for filename in migration_files:
            migration_name = filename[:-3]  # .py ์ œ๊ฑฐ
            
            if migration_name in executed:
                continue  # ์ด๋ฏธ ์‹คํ–‰๋œ ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ๊ฑด๋„ˆ๋›ฐ๊ธฐ
            
            # ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ๋ชจ๋“ˆ ๋™์  ๋กœ๋“œ ๋ฐ ์‹คํ–‰
            file_path = os.path.join(migrations_directory, filename)
            spec = importlib.util.spec_from_file_location(migration_name, file_path)
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)
            
            print(f"๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ์‹คํ–‰: {migration_name}")
            
            # ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ์‹คํ–‰
            try:
                if hasattr(module, 'up'):
                    module.up(self.cursor)
                    self.conn.commit()
                    self.record_migration(migration_name, current_batch)
                    executed_count += 1
                else:
                    print(f"๊ฒฝ๊ณ : {migration_name}์— up() ํ•จ์ˆ˜๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.")
            except Exception as e:
                self.conn.rollback()
                print(f"๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ์˜ค๋ฅ˜ {migration_name}: {e}")
                break
        
        return executed_count
    
    def rollback(self, steps=1):
        self.cursor.execute(
            "SELECT migration_name FROM migrations WHERE batch = "
            "(SELECT MAX(batch) FROM migrations) ORDER BY id DESC"
        )
        
        last_batch_migrations = self.cursor.fetchall()
        if not last_batch_migrations:
            print("๋กค๋ฐฑํ•  ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜์ด ์—†์Šต๋‹ˆ๋‹ค.")
            return 0
        
        rollback_count = 0
        for row in last_batch_migrations:
            migration_name = row['migration_name']
            
            # ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ๋ชจ๋“ˆ ๋กœ๋“œ
            import os
            import importlib.util
            
            file_path = os.path.join('migrations', f"{migration_name}.py")
            if not os.path.exists(file_path):
                print(f"๊ฒฝ๊ณ : {file_path}๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.")
                continue
            
            spec = importlib.util.spec_from_file_location(migration_name, file_path)
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)
            
            print(f"๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ๋กค๋ฐฑ: {migration_name}")
            
            # ๋กค๋ฐฑ ์‹คํ–‰
            try:
                if hasattr(module, 'down'):
                    module.down(self.cursor)
                    self.conn.commit()
                    
                    # ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ๊ธฐ๋ก ์‚ญ์ œ
                    self.cursor.execute(
                        "DELETE FROM migrations WHERE migration_name = %s",
                        (migration_name,)
                    )
                    self.conn.commit()
                    
                    rollback_count += 1
                else:
                    print(f"๊ฒฝ๊ณ : {migration_name}์— down() ํ•จ์ˆ˜๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.")
            except Exception as e:
                self.conn.rollback()
                print(f"๋กค๋ฐฑ ์˜ค๋ฅ˜ {migration_name}: {e}")
                break
            
            # ์ง€์ •๋œ ๋‹จ๊ณ„๋งŒํผ ๋กค๋ฐฑ
            if rollback_count >= steps:
                break
        
        return rollback_count

# ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ํŒŒ์ผ ์˜ˆ์‹œ (migrations/20210101_create_users_table.py)
"""
def up(cursor):
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INT AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(100) NOT NULL,
            email VARCHAR(100) UNIQUE,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')

def down(cursor):
    cursor.execute('DROP TABLE IF EXISTS users')
"""

โœ… ํŠน์ง•:

  • ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์Šคํ‚ค๋งˆ ๋ฒ„์ „ ๊ด€๋ฆฌ
  • ์ ์ง„์  ์Šคํ‚ค๋งˆ ๋ณ€๊ฒฝ ์ถ”์ 
  • ๋กค๋ฐฑ ๊ธฐ๋Šฅ ์ œ๊ณต
  • ์‹คํ–‰๋œ ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ๊ธฐ๋ก
  • ํŒŒ์ผ ๊ธฐ๋ฐ˜ ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜
  • ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ ๋ฐฉ์‹
  • ๋™์  ๋ชจ๋“ˆ ๋กœ๋”ฉ
  • ์—๋Ÿฌ ์ฒ˜๋ฆฌ ๋ฐ ํŠธ๋žœ์žญ์…˜ ์•ˆ์ „์„ฑ


6๏ธโƒฃ ๋ณด์•ˆ ๋ฐ ์ตœ์ ํ™”

๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ๊ณผ ์ฟผ๋ฆฌ์˜ ๋ณด์•ˆ ๋ฐ ์„ฑ๋Šฅ์„ ์ตœ์ ํ™”ํ•˜๋Š” ๊ธฐ๋ฒ•์ด๋‹ค.

# ๋ณด์•ˆ - SQL ์ธ์ ์…˜ ๋ฐฉ์ง€
def safe_query_example():
    # ์•ˆ์ „ํ•˜์ง€ ์•Š์€ ๋ฐฉ์‹ (์ ˆ๋Œ€ ์‚ฌ์šฉํ•˜์ง€ ๋ง ๊ฒƒ)
    def unsafe_search(username):
        # ์œ„ํ—˜: ์‚ฌ์šฉ์ž ์ž…๋ ฅ์„ ์ง์ ‘ ์ฟผ๋ฆฌ์— ์‚ฝ์ž…
        cursor.execute(f"SELECT * FROM users WHERE username = '{username}'")
        return cursor.fetchall()
    
    # ์•ˆ์ „ํ•œ ๋ฐฉ์‹ - ํŒŒ๋ผ๋ฏธํ„ฐํ™”๋œ ์ฟผ๋ฆฌ
    def safe_search(username):
        cursor.execute("SELECT * FROM users WHERE username = %s", (username,))
        return cursor.fetchall()
    
    # ๋™์  ์ฟผ๋ฆฌ๋ฅผ ์•ˆ์ „ํ•˜๊ฒŒ ์ƒ์„ฑ
    def dynamic_safe_search(criteria):
        query = "SELECT * FROM users WHERE 1=1"
        params = []
        
        if 'username' in criteria:
            query += " AND username = %s"
            params.append(criteria['username'])
        
        if 'status' in criteria:
            query += " AND status = %s"
            params.append(criteria['status'])
        
        if 'created_after' in criteria:
            query += " AND created_at > %s"
            params.append(criteria['created_after'])
        
        cursor.execute(query, params)
        return cursor.fetchall()

# ์ตœ์ ํ™” - ์ฟผ๋ฆฌ ์‹คํ–‰ ๊ณ„ํš ๋ถ„์„
def explain_query(query, params=None):
    cursor.execute(f"EXPLAIN {query}", params or ())
    return cursor.fetchall()

# ์ตœ์ ํ™” - ์ธ๋ฑ์Šค ๊ด€๋ฆฌ
def optimize_indexes():
    # ์ธ๋ฑ์Šค ์ถ”๊ฐ€
    def add_index(table, columns, index_name=None, unique=False):
        index_type = "UNIQUE" if unique else ""
        name = index_name or f"idx_{'_'.join(columns)}"
        
        columns_str = ', '.join(columns)
        try:
            cursor.execute(f"CREATE {index_type} INDEX {name} ON {table}({columns_str})")
            conn.commit()
            return True
        except mysql.connector.Error as err:
            print(f"์ธ๋ฑ์Šค ์ƒ์„ฑ ์˜ค๋ฅ˜: {err}")
            return False
    
    # ๋ถˆํ•„์š”ํ•œ ์ธ๋ฑ์Šค ์ฐพ๊ธฐ
    def find_unused_indexes():
        cursor.execute('''
            SELECT
                t.TABLE_SCHEMA,
                t.TABLE_NAME,
                s.INDEX_NAME,
                s.COLUMN_NAME
            FROM information_schema.STATISTICS s
            JOIN information_schema.TABLES t
                ON s.TABLE_SCHEMA = t.TABLE_SCHEMA
                AND s.TABLE_NAME = t.TABLE_NAME
            LEFT JOIN performance_schema.table_io_waits_summary_by_index_usage i
                ON i.OBJECT_SCHEMA = t.TABLE_SCHEMA
                AND i.OBJECT_NAME = t.TABLE_NAME
                AND i.INDEX_NAME = s.INDEX_NAME
            WHERE t.TABLE_SCHEMA = DATABASE()
                AND i.COUNT_STAR IS NULL
                OR i.COUNT_STAR = 0
            ORDER BY t.TABLE_NAME, s.INDEX_NAME
        ''')
        
        return cursor.fetchall()
    
    # ํ…Œ์ด๋ธ” ์ตœ์ ํ™”
    def optimize_table(table):
        cursor.execute(f"OPTIMIZE TABLE {table}")
        return cursor.fetchall()

# ์„ฑ๋Šฅ - ๋А๋ฆฐ ์ฟผ๋ฆฌ ๋ถ„์„
def analyze_slow_queries():
    # ๋А๋ฆฐ ์ฟผ๋ฆฌ ๋กœ๊ทธ ํ™œ์„ฑํ™”
    cursor.execute("SET GLOBAL slow_query_log = 'ON'")
    cursor.execute("SET GLOBAL long_query_time = 1")  # 1์ดˆ ์ด์ƒ ์ฟผ๋ฆฌ ๊ธฐ๋ก
    conn.commit()
    
    # ๋А๋ฆฐ ์ฟผ๋ฆฌ ์กฐํšŒ
    cursor.execute('''
        SELECT
            start_time,
            query_time,
            sql_text
        FROM mysql.slow_log
        ORDER BY start_time DESC
        LIMIT 10
    ''')
    
    return cursor.fetchall()

# ์„ฑ๋Šฅ - ์บ์‹ฑ ๋ ˆ์ด์–ด ๊ตฌํ˜„
import functools
import hashlib
import pickle
import time

class QueryCache:
    def __init__(self, max_size=100, default_ttl=300):
        self.cache = {}  # {key: (value, timestamp, ttl)}
        self.max_size = max_size
        self.default_ttl = default_ttl
    
    def _generate_key(self, query, params):
        key_data = f"{query}:{pickle.dumps(params) if params else ''}".encode('utf-8')
        return hashlib.md5(key_data).hexdigest()
    
    def get(self, key):
        if key not in self.cache:
            return None
        
        value, timestamp, ttl = self.cache[key]
        if time.time() - timestamp > ttl:
            del self.cache[key]
            return None
        
        return value
    
    def set(self, key, value, ttl=None):
        ttl = ttl or self.default_ttl
        
        # ์บ์‹œ ํฌ๊ธฐ ๊ด€๋ฆฌ
        if len(self.cache) >= self.max_size:
            # ๊ฐ€์žฅ ์˜ค๋ž˜๋œ ํ•ญ๋ชฉ ์ œ๊ฑฐ
            oldest_key = min(self.cache, key=lambda k: self.cache[k][1])
            del self.cache[oldest_key]
        
        self.cache[key] = (value, time.time(), ttl)
    
    def clear(self):
        self.cache.clear()

# ์บ์‹œ ์‚ฌ์šฉ ์˜ˆ์‹œ
def cached_database_access():
    cache = QueryCache()
    
    def get_user_cached(user_id, ttl=60):
        query = "SELECT * FROM users WHERE id = %s"
        params = (user_id,)
        
        cache_key = cache._generate_key(query, params)
        cached_result = cache.get(cache_key)
        
        if cached_result:
            print("์บ์‹œ ํžˆํŠธ!")
            return cached_result
        
        print("์บ์‹œ ๋ฏธ์Šค, DB ์กฐํšŒ ์ค‘...")
        cursor.execute(query, params)
        result = cursor.fetchone()
        
        if result:
            cache.set(cache_key, result, ttl)
        
        return result
    
    # ์—ฌ๋Ÿฌ ๋ฒˆ ๊ฐ™์€ ์‚ฌ์šฉ์ž ์กฐํšŒ
    user = get_user_cached(1)  # ์บ์‹œ ๋ฏธ์Šค
    user = get_user_cached(1)  # ์บ์‹œ ํžˆํŠธ

โœ… ํŠน์ง•:

  • SQL ์ธ์ ์…˜ ๋ฐฉ์ง€
  • ํŒŒ๋ผ๋ฏธํ„ฐํ™”๋œ ์ฟผ๋ฆฌ ์‚ฌ์šฉ
  • ๋™์  ์ฟผ๋ฆฌ ์•ˆ์ „ ๊ตฌํ˜„
  • ์ฟผ๋ฆฌ ์‹คํ–‰ ๊ณ„ํš ๋ถ„์„
  • ํšจ์œจ์ ์ธ ์ธ๋ฑ์Šค ๊ด€๋ฆฌ
  • ๋А๋ฆฐ ์ฟผ๋ฆฌ ์‹๋ณ„
  • ๋ฉ”๋ชจ๋ฆฌ ์บ์‹ฑ ๊ตฌํ˜„
  • ๋ฆฌ์†Œ์Šค ์‚ฌ์šฉ ์ตœ์ ํ™”


7๏ธโƒฃ ORM ํ†ตํ•ฉ

MySQL๊ณผ ๊ฐ์ฒด ๊ด€๊ณ„ ๋งคํ•‘(ORM) ํ”„๋ ˆ์ž„์›Œํฌ๋ฅผ ํ†ตํ•ฉํ•˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค.

# SQLAlchemy ORM ์˜ˆ์‹œ
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, Text, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
import datetime

# ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ
connection_string = f"mysql+mysqlconnector://{os.getenv('MYSQL_USER')}:{os.getenv('MYSQL_PASSWORD')}@{os.getenv('MYSQL_HOST')}/{os.getenv('MYSQL_DATABASE')}"
engine = create_engine(connection_string, echo=False)

# ๋ชจ๋ธ ์ •์˜
Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(100), nullable=False)
    email = Column(String(100), unique=True)
    created_at = Column(DateTime, default=datetime.datetime.utcnow)
    
    # ๊ด€๊ณ„ ์ •์˜
    posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
    
    def __repr__(self):
        return f"<User(name='{self.name}', email='{self.email}')>"

class Post(Base):
    __tablename__ = 'posts'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    user_id = Column(Integer, ForeignKey('users.id', ondelete='CASCADE'), nullable=False)
    title = Column(String(200), nullable=False)
    content = Column(Text)
    created_at = Column(DateTime, default=datetime.datetime.utcnow)
    
    # ๊ด€๊ณ„ ์ •์˜
    author = relationship("User", back_populates="posts")
    
    def __repr__(self):
        return f"<Post(title='{self.title}')>"

# ํ…Œ์ด๋ธ” ์ƒ์„ฑ
Base.metadata.create_all(engine)

# ์„ธ์…˜ ์ƒ์„ฑ
Session = sessionmaker(bind=engine)

def orm_example():
    session = Session()
    
    try:
        # ์‚ฌ์šฉ์ž ์ถ”๊ฐ€
        new_user = User(name='๊น€์ฒ ์ˆ˜', email='[email protected]')
        session.add(new_user)
        session.commit()
        
        # ์‚ฌ์šฉ์ž์™€ ์—ฐ๊ฒฐ๋œ ๊ฒŒ์‹œ๋ฌผ ์ถ”๊ฐ€
        new_post = Post(
            title='ORM ์‚ฌ์šฉํ•˜๊ธฐ',
            content='SQLAlchemy๋Š” ๊ฐ•๋ ฅํ•œ Python ORM ํ”„๋ ˆ์ž„์›Œํฌ์ž…๋‹ˆ๋‹ค.',
            author=new_user
        )
        session.add(new_post)
        session.commit()
        
        # ์กฐํšŒ - ๋‹จ์ผ ๋ ˆ์ฝ”๋“œ
        user = session.query(User).filter_by(email='[email protected]').first()
        print(f"์‚ฌ์šฉ์ž: {user.name}, ์ด๋ฉ”์ผ: {user.email}")
        
        # ์กฐํšŒ - JOIN
        results = session.query(User, Post).join(Post).filter(User.id == new_user.id).all()
        for user, post in results:
            print(f"์‚ฌ์šฉ์ž: {user.name}, ๊ฒŒ์‹œ๋ฌผ: {post.title}")
        
        # ์กฐํšŒ - ์ง‘๊ณ„
        post_count = session.query(func.count(Post.id)).scalar()
        print(f"์ด ๊ฒŒ์‹œ๋ฌผ ์ˆ˜: {post_count}")
        
        # ํ•„ํ„ฐ๋ง ๋ฐ ์ •๋ ฌ
        recent_posts = session.query(Post).order_by(Post.created_at.desc()).limit(5).all()
        for post in recent_posts:
            print(f"์ตœ๊ทผ ๊ฒŒ์‹œ๋ฌผ: {post.title}")
        
        # ์ˆ˜์ •
        user.name = '๊น€์ฒ ์ˆ˜ (์ˆ˜์ •๋จ)'
        session.commit()
        
        # ์‚ญ์ œ
        session.delete(new_post)
        session.commit()
        
    except Exception as e:
        session.rollback()
        print(f"ORM ์ž‘์—… ์˜ค๋ฅ˜: {e}")
    finally:
        session.close()

โœ… ํŠน์ง•:

  • ๊ฐ์ฒด-๊ด€๊ณ„ ๋งคํ•‘(ORM)
  • SQL ์ถ”์ƒํ™”
  • ๋ชจ๋ธ ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ ์ •์˜
  • ๊ด€๊ณ„ ์ž๋™ ๊ด€๋ฆฌ
  • ๋ณต์žกํ•œ ์ฟผ๋ฆฌ ๋‹จ์ˆœํ™”
  • ์„ธ์…˜ ๊ธฐ๋ฐ˜ ํŠธ๋žœ์žญ์…˜
  • ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ์ž๋™ํ™”
  • ์œ ์ง€๋ณด์ˆ˜์„ฑ ํ–ฅ์ƒ


์ฃผ์š” ํŒ

โœ… ๋ชจ๋ฒ” ์‚ฌ๋ก€:

  • ์ปค๋„ฅ์…˜ ํ’€๋ง ์‚ฌ์šฉ
    • ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์‹œ์ž‘ ์‹œ ํ’€ ์ดˆ๊ธฐํ™”
    • ์—ฐ๊ฒฐ ์ˆ˜๋ฅผ ์„œ๋ฒ„ ๋ฆฌ์†Œ์Šค์— ๋งž๊ฒŒ ์กฐ์ •
    • ๊ธด ํŠธ๋žœ์žญ์…˜์€ ๋ณ„๋„ ์—ฐ๊ฒฐ ์‚ฌ์šฉ ๊ณ ๋ ค
  • ํŒŒ๋ผ๋ฏธํ„ฐํ™”๋œ ์ฟผ๋ฆฌ ์‚ฌ์šฉ
    • SQL ์ธ์ ์…˜ ๋ฐฉ์ง€๋ฅผ ์œ„ํ•ด ํ•ญ์ƒ ์ ์šฉ
    • ๋™์  ์ฟผ๋ฆฌ ์ƒ์„ฑ ์‹œ ํŠนํžˆ ์ฃผ์˜
    • ORM ์‚ฌ์šฉ ์‹œ์—๋„ ์›์น™ ์œ ์ง€
  • ํŠธ๋žœ์žญ์…˜ ์ ์ ˆํžˆ ํ™œ์šฉ
    • ์›์ž์  ์ž‘์—…์€ ํŠธ๋žœ์žญ์…˜์œผ๋กœ ๋ฌถ๊ธฐ
    • ํŠธ๋žœ์žญ์…˜์€ ์ตœ๋Œ€ํ•œ ์งง๊ฒŒ ์œ ์ง€
    • ๊ฐ€๋Šฅํ•œ ๊ฒฝ์šฐ ์ž๋™ ์ปค๋ฐ‹ ํ™œ์„ฑํ™”
  • ์ธ๋ฑ์Šค ์ตœ์ ํ™”
    • ์ž์ฃผ ์‚ฌ์šฉ๋˜๋Š” ์ฟผ๋ฆฌ ํŒจํ„ด์— ๋งž๊ฒŒ ์„ค๊ณ„
    • ๋ณตํ•ฉ ์ธ๋ฑ์Šค ํ™œ์šฉ
    • ๋ถˆํ•„์š”ํ•œ ์ธ๋ฑ์Šค ์ œ๊ฑฐ
    • EXPLAIN ๋ช…๋ น์–ด๋กœ ์ฟผ๋ฆฌ ๊ณ„ํš ๊ฒ€์‚ฌ
  • ์ฃผ๊ธฐ์ ์ธ ๋ฐฑ์—…
    • ์ž๋™ํ™”๋œ ๋ฐฑ์—… ์‹œ์Šคํ…œ ๊ตฌ์ถ•
    • ์ „์ฒด ๋ฐ ์ฆ๋ถ„ ๋ฐฑ์—… ์ „๋žต ์ˆ˜๋ฆฝ
    • ๋ณต๊ตฌ ์ ˆ์ฐจ ์ •๊ธฐ์  ํ…Œ์ŠคํŠธ
  • ๋ณด์•ˆ ์„ค์ • ํ™•์ธ
    • ์ตœ์†Œ ๊ถŒํ•œ ์›์น™ ์ ์šฉ
    • ๋„คํŠธ์›Œํฌ ๋ฐฉํ™”๋ฒฝ ์„ค์ •
    • SSL/TLS ์—ฐ๊ฒฐ ์‚ฌ์šฉ
    • ๋น„๋ฐ€๋ฒˆํ˜ธ ์ •์ฑ… ๊ฐ•ํ™”
  • ์„ฑ๋Šฅ ๋ชจ๋‹ˆํ„ฐ๋ง
    • ๋А๋ฆฐ ์ฟผ๋ฆฌ ๋กœ๊ทธ ํ™œ์„ฑํ™”
    • ๋ฆฌ์†Œ์Šค ์‚ฌ์šฉ๋Ÿ‰ ์ถ”์ 
    • ๋ณ‘๋ชฉ ํ˜„์ƒ ์กฐ๊ธฐ ๋ฐœ๊ฒฌ
    • ์„ฑ๋Šฅ ์ง€ํ‘œ ์‹œ๊ฐํ™”
  • ์—๋Ÿฌ ์ฒ˜๋ฆฌ ๊ตฌํ˜„
    • ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ž‘์—…์— ์˜ˆ์™ธ ์ฒ˜๋ฆฌ
    • ์˜ค๋ฅ˜ ๋กœ๊น… ๋ฐ ์•Œ๋ฆผ ์„ค์ •
    • ์žฌ์‹œ๋„ ๋ฉ”์ปค๋‹ˆ์ฆ˜ ๊ตฌํ˜„
    • ์‚ฌ์šฉ์ž ์นœํ™”์  ์˜ค๋ฅ˜ ๋ฉ”์‹œ์ง€
  • ๊ฐ์ฒด ๊ด€๊ณ„ ๋งคํ•‘(ORM) ๊ณ ๋ ค
    • SQL ์ถ”์ƒํ™”๋กœ ์ƒ์‚ฐ์„ฑ ํ–ฅ์ƒ
    • N+1 ์ฟผ๋ฆฌ ๋ฌธ์ œ ์ธ์‹ ๋ฐ ํ•ด๊ฒฐ
    • ๋Œ€๋Ÿ‰ ์ž‘์—…์€ ๋„ค์ดํ‹ฐ๋ธŒ ์ฟผ๋ฆฌ ์‚ฌ์šฉ
    • ๋ณต์žกํ•œ ์ฟผ๋ฆฌ๋Š” ์ง์ ‘ SQL ์ž‘์„ฑ ๊ณ ๋ ค


โš ๏ธ **GitHub.com Fallback** โš ๏ธ