KR_Sanic - somaz94/python-study GitHub Wiki
Sanic์ Python 3.7+ ๊ธฐ๋ฐ์ ๊ณ ์ฑ๋ฅ ๋น๋๊ธฐ ์น ํ๋ ์์ํฌ๋ก, ๋น ๋ฅธ HTTP ์๋ต ์๋๋ฅผ ์ ๊ณตํ๋ฉฐ uvloop์ httptools๋ฅผ ํ์ฉํ์ฌ Node.js ๋ฐ Go์ ๋น์ทํ ์ฑ๋ฅ์ ๋ชฉํ๋ก ํ๋ค.
from sanic import Sanic
from sanic.response import json, text, html
from sanic.request import Request
from sanic import exceptions
import logging
import asyncio
from typing import Dict, Any, List, Optional, Union
# ์ ํ๋ฆฌ์ผ์ด์
์์ฑ
app = Sanic("MyApplication")
# ๊ธฐ๋ณธ ์ค์
app.config.FALLBACK_ERROR_FORMAT = "json" # ์ค๋ฅ ์๋ต ํ์
app.config.ACCESS_LOG = True # ์ ๊ทผ ๋ก๊ทธ ํ์ฑํ
app.config.FORWARDED_SECRET = "YOUR_SECRET_KEY" # ํ๋ก์ ๋ณด์
# ๋ก๊ฑฐ ์ค์
logger = logging.getLogger('sanic')
# ๊ธฐ๋ณธ ๋ผ์ฐํธ ํธ๋ค๋ฌ
@app.route("/", methods=["GET"])
async def index(request: Request) -> json:
"""
๊ธฐ๋ณธ ์ธ๋ฑ์ค ํ์ด์ง ํธ๋ค๋ฌ
Args:
request: Sanic ์์ฒญ ๊ฐ์ฒด
Returns:
JSON ์๋ต
"""
return json({
"message": "Welcome to Sanic API",
"status": "success",
"version": "1.0.0"
})
# ๊ฒฝ๋ก ํ๋ผ๋ฏธํฐ๋ฅผ ์ฌ์ฉํ ๋ผ์ฐํธ
@app.route("/users/<user_id:int>", methods=["GET"])
async def get_user(request: Request, user_id: int) -> json:
"""
์ฌ์ฉ์ ์ ๋ณด ์กฐํ ํธ๋ค๋ฌ
Args:
request: Sanic ์์ฒญ ๊ฐ์ฒด
user_id: ์ฌ์ฉ์ ID
Returns:
์ฌ์ฉ์ ์ ๋ณด JSON ์๋ต
"""
# ๋ฐ์ดํฐ๋ฒ ์ด์ค ์กฐํ ๋ก์ง (์์)
await asyncio.sleep(0.01) # ๋น๋๊ธฐ ์์
์๋ฎฌ๋ ์ด์
# ์ฌ์ฉ์๋ฅผ ์ฐพ์ง ๋ชปํ ๊ฒฝ์ฐ 404 ์ค๋ฅ ๋ฐ์
if user_id > 1000:
raise exceptions.NotFound(f"์ฌ์ฉ์ ID {user_id}๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค")
return json({
"user_id": user_id,
"name": f"User {user_id}",
"email": f"user{user_id}@example.com",
"created_at": "2023-01-01T00:00:00Z"
})
# POST ์์ฒญ ์ฒ๋ฆฌ
@app.post("/users")
async def create_user(request: Request) -> json:
"""
์ ์ฌ์ฉ์ ์์ฑ ํธ๋ค๋ฌ
Args:
request: Sanic ์์ฒญ ๊ฐ์ฒด
Returns:
์์ฑ๋ ์ฌ์ฉ์ ์ ๋ณด JSON ์๋ต
"""
try:
# ์์ฒญ ๋ณธ๋ฌธ์์ JSON ๋ฐ์ดํฐ ํ์ฑ
data = request.json
# ํ์ ํ๋ ๊ฒ์ฆ
if not data or 'name' not in data or 'email' not in data:
raise exceptions.BadRequest("์ด๋ฆ๊ณผ ์ด๋ฉ์ผ์ด ํ์ํฉ๋๋ค")
# ์ฌ์ฉ์ ์์ฑ ๋ก์ง (์์)
await asyncio.sleep(0.05) # ๋น๋๊ธฐ ์์
์๋ฎฌ๋ ์ด์
# ์์ฑ๋ ์ฌ์ฉ์ ์ ๋ณด ๋ฐํ
new_user = {
"user_id": 1001, # ์์ ID
"name": data['name'],
"email": data['email'],
"created_at": "2023-01-01T00:00:00Z"
}
logger.info(f"์ ์ฌ์ฉ์ ์์ฑ: {new_user['user_id']}")
return json({"status": "created", "user": new_user}, status=201)
except Exception as e:
logger.error(f"์ฌ์ฉ์ ์์ฑ ์ค๋ฅ: {str(e)}")
return json({"error": str(e)}, status=500)
# ์ค๋ฅ ํธ๋ค๋ฌ
@app.exception(exceptions.NotFound)
async def not_found(request: Request, exception: exceptions.NotFound) -> json:
"""
404 ์ค๋ฅ ํธ๋ค๋ฌ
Args:
request: Sanic ์์ฒญ ๊ฐ์ฒด
exception: ๋ฐ์ํ ์์ธ
Returns:
์ค๋ฅ ๋ฉ์์ง JSON ์๋ต
"""
return json({
"error": "not_found",
"message": str(exception)
}, status=404)
# ๋ค์ํ ์๋ต ํ์
@app.route("/responses")
async def response_types(request: Request):
"""๋ค์ํ ์๋ต ํ์
์์ """
response_type = request.args.get('type', 'json')
if response_type == 'text':
return text("Plain text response")
elif response_type == 'html':
return html("<h1>HTML Response</h1>")
else:
return json({"type": "json", "message": "JSON Response"})
# ์ ํ๋ฆฌ์ผ์ด์
์คํ
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000, debug=True, auto_reload=True)
โ
ํน์ง:
- ๋น๋๊ธฐ ์ฒ๋ฆฌ๋ฅผ ํตํ ๊ณ ์ฑ๋ฅ HTTP ์๋น์ค
- ํ์ ํํ ์ ํตํ ์ฝ๋ ๊ฐ๋ ์ฑ ๋ฐ IDE ์ง์ ํฅ์
- ๋ค์ํ ๊ฒฝ๋ก ํ๋ผ๋ฏธํฐ ํ์ ์ง์(int, string, uuid, float, path)
- ๋ค์ํ ์๋ต ํ์(JSON, HTML, ํ ์คํธ, ํ์ผ ๋ฑ)
- ๋ด์ฅ๋ ์์ธ ์ฒ๋ฆฌ ๋ฉ์ปค๋์ฆ
- ์๋ ๋ฆฌ๋ก๋ ๊ธฐ๋ฅ์ ํตํ ๊ฐ๋ฐ ์์ฐ์ฑ ํฅ์
- HTTP/2 ๋ฐ ์น์์ผ ์ง์
- ๋ฏธ๋ค์จ์ด ๋ฐ ๋ฆฌ์ค๋๋ฅผ ํตํ ํ์ฅ์ฑ
- uvloop ํ์ฉ์ผ๋ก ํ์ค asyncio๋ณด๋ค 2๋ฐฐ ์ด์ ๋น ๋ฅธ ์ฑ๋ฅ
- ๊ตฌ์กฐํ๋ ๋ก๊น ์์คํ
Sanic์ ์์ฒญ ์ฒ๋ฆฌ ์ ํ์ ์ฝ๋๋ฅผ ์คํํ ์ ์๋ ๋ฏธ๋ค์จ์ด์ ์๋ฒ ์๋ช
์ฃผ๊ธฐ ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌํ๋ ๋ฆฌ์ค๋๋ฅผ ์ ๊ณตํ์ฌ ์ ํ๋ฆฌ์ผ์ด์
์ ํ์ฅ์ฑ๊ณผ ๊ธฐ๋ฅ์ฑ์ ๋์ธ๋ค.
from sanic import Sanic, Request, response
from sanic.response import json
import uuid
import time
import logging
import asyncio
import aioredis
from datetime import datetime
from typing import Dict, Any, Optional, Callable
app = Sanic("MiddlewareApp")
logger = logging.getLogger('sanic.middleware')
# ์์ฒญ ๋ฏธ๋ค์จ์ด ์ ์
@app.middleware('request')
async def add_request_id(request: Request):
"""
๊ฐ ์์ฒญ์ ๊ณ ์ ID ์ถ๊ฐ
Args:
request: Sanic ์์ฒญ ๊ฐ์ฒด
"""
request_id = str(uuid.uuid4())
request.ctx.request_id = request_id
logger.info(f"Request started: {request_id}")
# ์์ฒญ ์์ ์๊ฐ ๊ธฐ๋ก
request.ctx.start_time = time.time()
# ๋ค๋ฅธ ์์ฒญ ๋ฏธ๋ค์จ์ด - ์ธ์ฆ ๊ฒ์ฌ
@app.middleware('request')
async def check_auth(request: Request):
"""
์ธ์ฆ ํค๋ ๊ฒ์ฌ ๋ฏธ๋ค์จ์ด
Args:
request: Sanic ์์ฒญ ๊ฐ์ฒด
"""
# ์ธ์ฆ์ด ํ์ ์๋ ๊ฒฝ๋ก๋ ๊ฑด๋๋
if request.path in ['/login', '/register', '/health']:
return
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
# ์ด ๋ผ์ธ์ ๋ฏธ๋ค์จ์ด์์ ์๋ต์ ๋ฐํํ๋ฏ๋ก ์์ฒญ ์ฒ๋ฆฌ๊ฐ ์ค๋จ๋จ
return json({'error': 'unauthorized', 'message': '์ธ์ฆ์ด ํ์ํฉ๋๋ค'}, status=401)
# ์ค์ ๊ตฌํ์์๋ ํ ํฐ ๊ฒ์ฆ ๋ก์ง ์ถ๊ฐ
token = auth_header.split(' ')[1]
# ์: validate_token(token)
# ์ธ์ฆ๋ ์ฌ์ฉ์ ์ ๋ณด๋ฅผ ์์ฒญ ์ปจํ
์คํธ์ ์ ์ฅ
request.ctx.user = {'id': 1, 'role': 'user'} # ์์ ๋ฐ์ดํฐ
# ์๋ต ๋ฏธ๋ค์จ์ด ์ ์
@app.middleware('response')
async def add_response_headers(request: Request, response):
"""
์๋ต ํค๋ ์ถ๊ฐ ๋ฏธ๋ค์จ์ด
Args:
request: Sanic ์์ฒญ ๊ฐ์ฒด
response: Sanic ์๋ต ๊ฐ์ฒด
"""
# ์์ฒญ ID ํค๋ ์ถ๊ฐ
if hasattr(request.ctx, 'request_id'):
response.headers['X-Request-ID'] = request.ctx.request_id
# ์๋ต ์๊ฐ ๊ธฐ๋ก ๋ฐ ํค๋ ์ถ๊ฐ
if hasattr(request.ctx, 'start_time'):
duration = time.time() - request.ctx.start_time
response.headers['X-Response-Time'] = f"{duration:.6f}s"
logger.info(f"Request {request.ctx.request_id} completed in {duration:.6f}s")
# ๋ณด์ ํค๋ ์ถ๊ฐ
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
# ์ปค์คํ
๋ฏธ๋ค์จ์ด ํจ์ ์์ฑ
def rate_limit_middleware(limit: int = 10, window: int = 60):
"""
์๋ ์ ํ ๋ฏธ๋ค์จ์ด ํฉํ ๋ฆฌ
Args:
limit: ์๊ฐ ์ฐฝ ๋ด ์ต๋ ์์ฒญ ์
window: ์๊ฐ ์ฐฝ (์ด)
Returns:
๋ฏธ๋ค์จ์ด ํจ์
"""
request_counts = {}
@app.middleware('request')
async def rate_limit(request: Request):
# ํด๋ผ์ด์ธํธ ์๋ณ
client_ip = request.client_ip
current_time = int(time.time())
# ํ์ฌ ํด๋ผ์ด์ธํธ์ ์์ฒญ ๊ธฐ๋ก ๊ฐ์ ธ์ค๊ธฐ
if client_ip not in request_counts:
request_counts[client_ip] = []
# ์๊ฐ ์ฐฝ ์ธ์ ์์ฒญ ์ ๊ฑฐ
request_counts[client_ip] = [timestamp for timestamp in request_counts[client_ip]
if timestamp > current_time - window]
# ์์ฒญ ์ ํ์ธ
if len(request_counts[client_ip]) >= limit:
return json({
'error': 'rate_limit_exceeded',
'message': f'์์ฒญ ํ๋ ์ด๊ณผ, {window}์ด ํ์ ๋ค์ ์๋ํ์ธ์'
}, status=429)
# ํ์ฌ ์์ฒญ ์๊ฐ ๊ธฐ๋ก
request_counts[client_ip].append(current_time)
return rate_limit
# ์๋ฒ ์์ ์ ๋ฆฌ์ค๋
@app.listener('before_server_start')
async def setup_db(app: Sanic, loop):
"""
์๋ฒ ์์ ์ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ์ค์
Args:
app: Sanic ์ฑ ์ธ์คํด์ค
loop: ์ด๋ฒคํธ ๋ฃจํ
"""
logger.info("๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ์ค์ ์ค...")
# ์์: PostgreSQL ์ฐ๊ฒฐ ์ค์
# import asyncpg
# app.ctx.db_pool = await asyncpg.create_pool(
# user='user', password='password',
# database='database', host='localhost'
# )
# ์์: Redis ์ฐ๊ฒฐ ์ค์
try:
app.ctx.redis = await aioredis.from_url(
"redis://localhost", encoding="utf-8", decode_responses=True
)
logger.info("Redis ์ฐ๊ฒฐ ์ฑ๊ณต")
except Exception as e:
logger.error(f"Redis ์ฐ๊ฒฐ ์คํจ: {e}")
logger.info("์๋ฒ ์์ ์ค๋น ์๋ฃ")
# ์๋ฒ ์์ ํ ๋ฆฌ์ค๋
@app.listener('after_server_start')
async def notify_server_started(app: Sanic, loop):
"""
์๋ฒ ์์ ์๋ฆผ
Args:
app: Sanic ์ฑ ์ธ์คํด์ค
loop: ์ด๋ฒคํธ ๋ฃจํ
"""
logger.info(f"์๋ฒ๊ฐ ์์๋จ! http://{app.config.HOST}:{app.config.PORT}")
# ํ์ํ ๊ฒฝ์ฐ ์๋ฒ ์์ ์๋ฆผ ๋๋ ์ด๊ธฐ ์์
์ํ
# ์: ์บ์ ์๋ฐ์
, ์ด๊ธฐ ๋ฐ์ดํฐ ๋ก๋ ๋ฑ
async def warmup_cache():
logger.info("์บ์ ์๋ฐ์
์ค...")
await asyncio.sleep(1)
# ์บ์ ์๋ฐ์
๋ก์ง
logger.info("์บ์ ์๋ฐ์
์๋ฃ")
# ๋ฐฑ๊ทธ๋ผ์ด๋ ์์
์ผ๋ก ์คํ
asyncio.create_task(warmup_cache())
# ์๋ฒ ์ค์ง ์ ๋ฆฌ์ค๋
@app.listener('before_server_stop')
async def notify_server_stopping(app: Sanic, loop):
"""
์๋ฒ ์ค์ง ์๋ฆผ
Args:
app: Sanic ์ฑ ์ธ์คํด์ค
loop: ์ด๋ฒคํธ ๋ฃจํ
"""
logger.info("์๋ฒ ์ค์ง ์ค...")
# ์งํ ์ค์ธ ์์
์๋ฃ ๋๋ ์ ๋ฆฌ
# ์๋ฒ ์ค์ง ํ ๋ฆฌ์ค๋
@app.listener('after_server_stop')
async def close_db(app: Sanic, loop):
"""
์๋ฒ ์ค์ง ํ ๋ฆฌ์์ค ์ ๋ฆฌ
Args:
app: Sanic ์ฑ ์ธ์คํด์ค
loop: ์ด๋ฒคํธ ๋ฃจํ
"""
logger.info("๋ฆฌ์์ค ์ ๋ฆฌ ์ค...")
# Redis ์ฐ๊ฒฐ ์ข
๋ฃ
if hasattr(app.ctx, 'redis'):
await app.ctx.redis.close()
logger.info("Redis ์ฐ๊ฒฐ ์ข
๋ฃ๋จ")
# ๋ฐ์ดํฐ๋ฒ ์ด์ค ํ ์ข
๋ฃ
if hasattr(app.ctx, 'db_pool'):
await app.ctx.db_pool.close()
logger.info("๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ํ ์ข
๋ฃ๋จ")
logger.info("๋ชจ๋ ๋ฆฌ์์ค๊ฐ ์ ๋ฆฌ๋จ")
# ๋ฆฌ์ค๋๋ฅผ ํจ์๋ก ๋ฑ๋กํ๋ ๋ฐฉ๋ฒ
async def setup_something(app: Sanic, loop):
logger.info("์ถ๊ฐ ์ค์ ์คํ ์ค...")
app.ctx.start_time = datetime.now()
# ๋์ค์ ๋ฆฌ์ค๋ ๋ฑ๋ก
app.register_listener(setup_something, 'before_server_start')
# ํฌ์ค์ฒดํฌ ๊ฒฝ๋ก
@app.route('/health')
async def health_check(request: Request):
"""์๋ฒ ์ํ ํ์ธ ์๋ํฌ์ธํธ"""
uptime = None
if hasattr(app.ctx, 'start_time'):
uptime = (datetime.now() - app.ctx.start_time).total_seconds()
return json({
'status': 'healthy',
'version': '1.0.0',
'uptime_seconds': uptime
})
โ
ํน์ง:
- ์์ฒญ/์๋ต ์ฒ๋ฆฌ๋ฅผ ์ํ ๊ฐ๋ ฅํ ๋ฏธ๋ค์จ์ด ์ฒด์ธ
- ์๋ฒ ์๋ช ์ฃผ๊ธฐ ๊ด๋ฆฌ๋ฅผ ์ํ ๋ฆฌ์ค๋ ์์คํ
- ๋ฏธ๋ค์จ์ด๋ฅผ ํ์ฉํ ๊ณตํต ๊ธฐ๋ฅ ๊ตฌํ (์ธ์ฆ, ๋ก๊น , ์๋ ์ ํ ๋ฑ)
- ์์ฒญ/์๋ต ์ปจํ ์คํธ๋ฅผ ํตํ ๋ฐ์ดํฐ ๊ณต์
- ๋น๋๊ธฐ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ๊ด๋ฆฌ
- ๋ฐฑ๊ทธ๋ผ์ด๋ ์์ ์คํ ๊ธฐ๋ฅ
- ์ ํ๋ฆฌ์ผ์ด์ ์ํ ๋ฐ ๋ฆฌ์์ค ๊ด๋ฆฌ
- ์์ฒญ๋ณ ๊ณ ์ ID ๋ฐ ์ถ์ ๊ธฐ๋ฅ
- ๋ณด์ ๊ด๋ จ ํค๋ ์๋ ์ถ๊ฐ
- ์์ฒญ ์ฒ๋ฆฌ ์ฑ๋ฅ ์ธก์ ๋ฐ ๋ชจ๋ํฐ๋ง
- ์ฐ์ํ ์ข ๋ฃ ๋ฐ ๋ฆฌ์์ค ์ ๋ฆฌ ์ง์
๋ธ๋ฃจํ๋ฆฐํธ๋ Sanic ์ ํ๋ฆฌ์ผ์ด์
์ ๋ชจ๋ํํ๊ณ ๊ตฌ์กฐํํ๋ ๊ฐ๋ ฅํ ๋ฐฉ๋ฒ์ผ๋ก, ๊ด๋ จ ๊ธฐ๋ฅ์ ๋
ผ๋ฆฌ์ ์ผ๋ก ๊ทธ๋ฃนํํ๊ณ ์ฝ๋ ์ฌ์ฌ์ฉ์ฑ์ ๋์ผ ์ ์๋ค.
from sanic import Sanic, Blueprint, Request, HTTPResponse
from sanic.response import json, text, html, file, redirect
from sanic import exceptions
import logging
from typing import Dict, Any, List, Optional, Tuple
# ๋ฉ์ธ ์ ํ๋ฆฌ์ผ์ด์
์ธ์คํด์ค ์์ฑ
app = Sanic("BlueprintApp")
logger = logging.getLogger('sanic.blueprints')
# ๊ธฐ๋ณธ API ๋ธ๋ฃจํ๋ฆฐํธ ์์ฑ
api_v1 = Blueprint('api_v1', url_prefix='/api/v1')
api_v2 = Blueprint('api_v2', url_prefix='/api/v2')
# ๊ด๋ฆฌ์ ์์ญ ๋ธ๋ฃจํ๋ฆฐํธ ์์ฑ
admin = Blueprint('admin', url_prefix='/admin')
# ์ฌ์ฉ์ ๋ธ๋ฃจํ๋ฆฐํธ (v1)
users_v1 = Blueprint('users_v1', url_prefix='/users')
api_v1.blueprint(users_v1) # api_v1์ ์ฌ์ฉ์ ๋ธ๋ฃจํ๋ฆฐํธ ์ค์ฒฉ
# ์ฌ์ฉ์ ๋ธ๋ฃจํ๋ฆฐํธ (v2)
users_v2 = Blueprint('users_v2', url_prefix='/users')
api_v2.blueprint(users_v2) # api_v2์ ์ฌ์ฉ์ ๋ธ๋ฃจํ๋ฆฐํธ ์ค์ฒฉ
# ์ธ์ฆ ๋ธ๋ฃจํ๋ฆฐํธ
auth = Blueprint('auth', url_prefix='/auth')
#################################################
# API v1 ๋ผ์ฐํธ ์ ์
#################################################
@api_v1.route('/')
async def api_index(request: Request) -> HTTPResponse:
"""API v1 ์ธ๋ฑ์ค ํ์ด์ง"""
return json({
'version': 'v1',
'endpoints': ['/users', '/items', '/auth']
})
@users_v1.route('/')
async def list_users_v1(request: Request) -> HTTPResponse:
"""์ฌ์ฉ์ ๋ชฉ๋ก ์กฐํ (v1)"""
users = [
{'id': 1, 'name': 'User 1'},
{'id': 2, 'name': 'User 2'}
]
return json({'users': users})
@users_v1.route('/<user_id:int>')
async def get_user_v1(request: Request, user_id: int) -> HTTPResponse:
"""๋จ์ผ ์ฌ์ฉ์ ์กฐํ (v1)"""
return json({'id': user_id, 'name': f'User {user_id}'})
@users_v1.post('/')
async def create_user_v1(request: Request) -> HTTPResponse:
"""์ฌ์ฉ์ ์์ฑ (v1)"""
user_data = request.json
# ์ฌ์ฉ์ ์์ฑ ๋ก์ง
return json({'status': 'created', 'user': user_data}, status=201)
# ์์ดํ
๋ผ์ฐํธ - ๋ชจ๋ HTTP ๋ฉ์๋ ์ฒ๋ฆฌ
@api_v1.route('/items/<item_id:int>', methods=['GET', 'PUT', 'DELETE'])
async def handle_item_v1(request: Request, item_id: int) -> HTTPResponse:
"""์์ดํ
์ฒ๋ฆฌ (v1) - ์ฌ๋ฌ HTTP ๋ฉ์๋ ์ฒ๋ฆฌ"""
if request.method == 'GET':
return json({'item_id': item_id, 'name': f'Item {item_id}'})
elif request.method == 'PUT':
item_data = request.json
# ์์ดํ
์
๋ฐ์ดํธ ๋ก์ง
return json({'status': 'updated', 'item': item_data})
elif request.method == 'DELETE':
# ์์ดํ
์ญ์ ๋ก์ง
return json({'status': 'deleted'})
#################################################
# API v2 ๋ผ์ฐํธ ์ ์ (์ ๊ธฐ๋ฅ ์ถ๊ฐ)
#################################################
@api_v2.route('/')
async def api_index_v2(request: Request) -> HTTPResponse:
"""API v2 ์ธ๋ฑ์ค ํ์ด์ง"""
return json({
'version': 'v2',
'endpoints': ['/users', '/items', '/auth', '/stats']
})
@users_v2.route('/')
async def list_users_v2(request: Request) -> HTTPResponse:
"""์ฌ์ฉ์ ๋ชฉ๋ก ์กฐํ (v2) - ํฅ์๋ ๊ธฐ๋ฅ"""
# ํ์ด์ง๋ค์ด์
์ง์
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 10))
# ์ํ ๋ฐ์ดํฐ
users = [
{'id': i, 'name': f'User {i}', 'email': f'user{i}@example.com'}
for i in range((page-1)*per_page + 1, page*per_page + 1)
]
return json({
'users': users,
'pagination': {
'page': page,
'per_page': per_page,
'total': 100, # ์ํ ์ด ์ฌ์ฉ์ ์
'pages': 10
}
})
@users_v2.route('/<user_id:int>')
async def get_user_v2(request: Request, user_id: int) -> HTTPResponse:
"""๋จ์ผ ์ฌ์ฉ์ ์กฐํ (v2) - ํฅ์๋ ์ ๋ณด"""
# ํ์ฅ๋ ์ฌ์ฉ์ ์ ๋ณด
return json({
'id': user_id,
'name': f'User {user_id}',
'email': f'user{user_id}@example.com',
'profile': {
'bio': 'User bio text',
'avatar_url': f'https://example.com/avatars/{user_id}.jpg'
},
'created_at': '2023-01-01T00:00:00Z'
})
@api_v2.route('/stats')
async def get_stats(request: Request) -> HTTPResponse:
"""ํต๊ณ API (v2์๋ง ์๋ ์ ๊ธฐ๋ฅ)"""
return json({
'users_count': 100,
'items_count': 500,
'active_users': 42
})
#################################################
# ์ธ์ฆ ๋ธ๋ฃจํ๋ฆฐํธ
#################################################
@auth.route('/login', methods=['POST'])
async def login(request: Request) -> HTTPResponse:
"""์ฌ์ฉ์ ๋ก๊ทธ์ธ ์ฒ๋ฆฌ"""
data = request.json
username = data.get('username')
password = data.get('password')
# ๋ก๊ทธ์ธ ๋ก์ง (์์)
if username == 'admin' and password == 'password':
return json({
'token': 'sample-jwt-token',
'user_id': 1,
'expires_in': 3600
})
else:
return json({'error': 'invalid_credentials'}, status=401)
@auth.route('/logout', methods=['POST'])
async def logout(request: Request) -> HTTPResponse:
"""์ฌ์ฉ์ ๋ก๊ทธ์์ ์ฒ๋ฆฌ"""
# ๋ก๊ทธ์์ ๋ก์ง
return json({'status': 'logged_out'})
#################################################
# ๊ด๋ฆฌ์ ๋ธ๋ฃจํ๋ฆฐํธ
#################################################
@admin.route('/')
async def admin_index(request: Request) -> HTTPResponse:
"""๊ด๋ฆฌ์ ๋์๋ณด๋ ์ธ๋ฑ์ค"""
return html('<h1>Admin Dashboard</h1>')
@admin.route('/users')
async def admin_users(request: Request) -> HTTPResponse:
"""๊ด๋ฆฌ์์ฉ ์ฌ์ฉ์ ๊ด๋ฆฌ ํ์ด์ง"""
return html('<h1>User Management</h1>')
#################################################
# ์ ์ ํ์ผ ๋ฐ ํน์ ๋ผ์ฐํธ
#################################################
# ์ ์ ํ์ผ ์ ๊ณต (app ๋ ๋ฒจ)
app.static('/static', './static')
app.static('/favicon.ico', './static/favicon.ico')
# ๋ฆฌ๋๋ ์
๋ผ์ฐํธ
@app.route('/old-path')
async def old_path_redirect(request: Request) -> HTTPResponse:
"""์ด์ ๊ฒฝ๋ก์์ ์ ๊ฒฝ๋ก๋ก ๋ฆฌ๋๋ ์
"""
return redirect('/api/v2')
# ํ์ผ ๋ค์ด๋ก๋ ๋ผ์ฐํธ
@app.route('/download/<filename:path>')
async def download_file(request: Request, filename: str) -> HTTPResponse:
"""ํ์ผ ๋ค์ด๋ก๋ ์ฒ๋ฆฌ"""
return await file(f'./downloads/{filename}')
# ์๋ฌ ์ฒ๋ฆฌ ๋ธ๋ฃจํ๋ฆฐํธ ์์ค
@api_v1.exception(exceptions.NotFound)
async def api_v1_not_found(request: Request, exception: exceptions.NotFound) -> HTTPResponse:
"""API v1 404 ์ค๋ฅ ํธ๋ค๋ฌ"""
return json({
'error': 'not_found',
'message': str(exception),
'api_version': 'v1'
}, status=404)
# URL ํ๋ผ๋ฏธํฐ ํ์
์์
@app.route('/params/<name:str>/<value:float>/<uuid:uuid>')
async def params_example(request: Request, name: str, value: float, uuid: str) -> HTTPResponse:
"""๋ค์ํ URL ํ๋ผ๋ฏธํฐ ํ์
์์ """
return json({
'name': name,
'value': value,
'uuid': uuid
})
# ๋ธ๋ฃจํ๋ฆฐํธ ๋ฏธ๋ค์จ์ด ์์
@api_v1.middleware('request')
async def api_v1_middleware(request: Request):
"""API v1์๋ง ์ ์ฉ๋๋ ๋ฏธ๋ค์จ์ด"""
request.ctx.api_version = 'v1'
logger.info(f"API v1 ์์ฒญ: {request.path}")
# ๋ธ๋ฃจํ๋ฆฐํธ๋ฅผ ์ ํ๋ฆฌ์ผ์ด์
์ ๋ฑ๋ก
app.blueprint(api_v1)
app.blueprint(api_v2)
app.blueprint(admin)
app.blueprint(auth)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000, debug=True)
โ
ํน์ง:
- ๊ธฐ๋ฅ๋ณ ๋ชจ๋ํ๋ฅผ ํตํ ์ฝ๋ ๊ตฌ์กฐํ
- URL ์ ๋์ฌ๋ฅผ ํตํ API ๊ฒฝ๋ก ์กฐ์งํ
- ์ค์ฒฉ ๋ธ๋ฃจํ๋ฆฐํธ๋ก ๋ณต์กํ ๋ผ์ฐํ ๊ณ์ธต ๊ตฌ์ฑ
- HTTP ๋ฉ์๋๋ณ ๋ผ์ฐํธ ์ฒ๋ฆฌ (GET, POST, PUT, DELETE ๋ฑ)
- API ๋ฒ์ ๊ด๋ฆฌ๋ฅผ ์ํ ํจ๊ณผ์ ์ธ ๊ตฌ์กฐ
- ๋ธ๋ฃจํ๋ฆฐํธ๋ณ ๋ฏธ๋ค์จ์ด ์ ์ ๊ฐ๋ฅ
- ๋ธ๋ฃจํ๋ฆฐํธ๋ณ ์์ธ ์ฒ๋ฆฌ ๊ธฐ๋ฅ
- ๋ค์ํ ์๋ต ํ์ ์ง์ (JSON, HTML, ํ์ผ, ๋ฆฌ๋๋ ์ )
- ์ ์ ํ์ผ ์๋น ๊ธฐ๋ฅ
- URL ํ๋ผ๋ฏธํฐ์ ํ์ ๊ฒ์ฆ ๋ฐ ๋ณํ
- ์์ฒญ ์ปจํ ์คํธ๋ฅผ ํตํ ๋ฐ์ดํฐ ์ ์ฅ ๋ฐ ๊ณต์
- RESTful API ์ค๊ณ ํจํด ์ง์
Sanic์์๋ ๋ฏธ๋ค์จ์ด, ๋ฐ์ฝ๋ ์ดํฐ, ๊ทธ๋ฆฌ๊ณ JWT์ ๊ฐ์ ๋๊ตฌ๋ฅผ ํ์ฉํ์ฌ ๊ฒฌ๊ณ ํ ์ธ์ฆ ์์คํ
๊ณผ ๋ณด์ ๋ฉ์ปค๋์ฆ์ ๊ตฌํํ ์ ์๋ค.
from sanic import Sanic, Blueprint, Request, HTTPResponse
from sanic.response import json
from sanic.exceptions import Unauthorized, Forbidden
from functools import wraps
import jwt
import time
import os
import logging
import uuid
import hashlib
import secrets
from typing import Dict, Any, Callable, Optional
# ์ ํ๋ฆฌ์ผ์ด์
์ค์
app = Sanic("SecureApp")
app.config.SECRET_KEY = os.environ.get('SECRET_KEY', 'your-secret-key-here') # ์ค์ ๋ก ํ๊ฒฝ ๋ณ์ ์ฌ์ฉ
# ๋ก๊ฑฐ ์ค์
logger = logging.getLogger('sanic.security')
# ์ฌ์ฉ์ ์์ ๋ฐ์ดํฐ (์ค์ ๋ก๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ๊ด๋ฆฌ)
USERS = {
1: {"id": 1, "username": "admin", "password_hash": hashlib.sha256("admin_password".encode()).hexdigest(), "role": "admin"},
2: {"id": 2, "username": "user", "password_hash": hashlib.sha256("user_password".encode()).hexdigest(), "role": "user"}
}
# ์ฐจ๋จ๋ ํ ํฐ ์ถ์ (Redis ๋ฑ์ผ๋ก ๋์ฒด ๊ฐ๋ฅ)
BLACKLISTED_TOKENS = set()
####################################################
# ํ ํฐ ์์ฑ ๋ฐ ๊ด๋ฆฌ ํจ์
####################################################
def generate_token(user_id: int, role: str = "user", expiry: int = 3600) -> str:
"""
JWT ํ ํฐ ์์ฑ
Args:
user_id: ์ฌ์ฉ์ ID
role: ์ฌ์ฉ์ ์ญํ
expiry: ๋ง๋ฃ ์๊ฐ(์ด)
Returns:
str: ์์ฑ๋ JWT ํ ํฐ
"""
payload = {
'sub': user_id,
'role': role,
'iat': int(time.time()),
'exp': int(time.time()) + expiry,
'jti': str(uuid.uuid4())
}
token = jwt.encode(
payload,
app.config.SECRET_KEY,
algorithm='HS256'
)
logger.info(f"์ฌ์ฉ์ ID {user_id}์ ํ ํฐ ์์ฑ๋จ")
return token
def verify_token(token: str) -> Dict[str, Any]:
"""
JWT ํ ํฐ ๊ฒ์ฆ
Args:
token: JWT ํ ํฐ
Returns:
Dict: ํ ํฐ ํ์ด๋ก๋
Raises:
jwt.InvalidTokenError: ์ ํจํ์ง ์์ ํ ํฐ
jwt.ExpiredSignatureError: ๋ง๋ฃ๋ ํ ํฐ
"""
# ์ฐจ๋จ ๋ชฉ๋ก ํ์ธ
if token in BLACKLISTED_TOKENS:
raise jwt.InvalidTokenError("Blacklisted token")
# ํ ํฐ ๊ฒ์ฆ
payload = jwt.decode(
token,
app.config.SECRET_KEY,
algorithms=['HS256']
)
return payload
####################################################
# ์ธ์ฆ ๋ฐ์ฝ๋ ์ดํฐ
####################################################
def protected(roles: Optional[list] = None) -> Callable:
"""
๋ณดํธ๋ ๊ฒฝ๋ก๋ฅผ ์ํ ๋ฐ์ฝ๋ ์ดํฐ
Args:
roles: ํ์ฉ๋ ์ญํ ๋ชฉ๋ก (None์ด๋ฉด ๋ชจ๋ ์ธ์ฆ๋ ์ฌ์ฉ์ ํ์ฉ)
Returns:
Callable: ๋ฐ์ฝ๋ ์ดํฐ ํจ์
"""
def decorator(f):
@wraps(f)
async def decorated_function(request: Request, *args, **kwargs):
# ์ธ์ฆ ํค๋ ํ์ธ
auth_header = request.headers.get('Authorization')
if not auth_header:
logger.warning(f"์ธ์ฆ ํค๋ ๋๋ฝ: {request.path}")
raise Unauthorized("์ธ์ฆ ํ ํฐ์ด ํ์ํฉ๋๋ค")
# ํค๋ ํ์ ํ์ธ
parts = auth_header.split()
if parts[0].lower() != 'bearer' or len(parts) != 2:
logger.warning(f"์๋ชป๋ ์ธ์ฆ ํค๋ ํ์: {auth_header}")
raise Unauthorized("์๋ชป๋ ์ธ์ฆ ํค๋ ํ์")
token = parts[1]
try:
# ํ ํฐ ๊ฒ์ฆ
payload = verify_token(token)
# ์ฌ์ฉ์ ์ ๋ณด ์์ฒญ ์ปจํ
์คํธ์ ์ ์ฅ
request.ctx.user_id = payload['sub']
request.ctx.role = payload['role']
# ์ญํ ๊ธฐ๋ฐ ์ก์ธ์ค ์ ์ด ํ์ธ
if roles and payload['role'] not in roles:
logger.warning(f"๊ถํ ๋ถ์กฑ - ์ฌ์ฉ์ {request.ctx.user_id}, ์ญํ : {request.ctx.role}, ํ์: {roles}")
raise Forbidden("์ด ์์
์ ์ํํ ๊ถํ์ด ์์ต๋๋ค")
# ์๋ ํธ๋ค๋ฌ ํธ์ถ
return await f(request, *args, **kwargs)
except jwt.ExpiredSignatureError:
logger.warning(f"๋ง๋ฃ๋ ํ ํฐ: {token[:10]}...")
raise Unauthorized("ํ ํฐ์ด ๋ง๋ฃ๋์์ต๋๋ค")
except jwt.InvalidTokenError as e:
logger.warning(f"์ ํจํ์ง ์์ ํ ํฐ: {token[:10]}..., ์ค๋ฅ: {str(e)}")
raise Unauthorized("์ ํจํ์ง ์์ ํ ํฐ์
๋๋ค")
except Exception as e:
logger.error(f"์ธ์ฆ ์ค ์ค๋ฅ: {str(e)}")
raise Unauthorized("์ธ์ฆ ์ฒ๋ฆฌ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค")
return decorated_function
return decorator
####################################################
# ๋ณด์ ํค๋ ๋ฏธ๋ค์จ์ด
####################################################
@app.middleware('response')
async def add_security_headers(request: Request, response):
"""๋ณด์ ๊ด๋ จ ํค๋ ์ถ๊ฐ"""
# XSS ๋ฐฉ์ง
response.headers['X-XSS-Protection'] = '1; mode=block'
# ํด๋ฆญ์ฌํน ๋ฐฉ์ง
response.headers['X-Frame-Options'] = 'DENY'
# MIME ์ค๋ํ ๋ฐฉ์ง
response.headers['X-Content-Type-Options'] = 'nosniff'
# ์ฝํ
์ธ ๋ณด์ ์ ์ฑ
์ค์
response.headers['Content-Security-Policy'] = "default-src 'self'"
# HSTS ์ค์ (HTTPS ์ฌ์ฉ ์)
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
# ์บ์ ์ ์ด (์ธ์ฆ์ด ํ์ํ ํ์ด์ง)
if hasattr(request.ctx, 'user_id'):
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
response.headers['Pragma'] = 'no-cache'
####################################################
# CSRF ๋ณดํธ ๋ฏธ๋ค์จ์ด
####################################################
CSRF_TOKENS = {} # ์ค์ ๊ตฌํ์์๋ Redis ๋ฑ ์ฌ์ฉ
def generate_csrf_token() -> str:
"""CSRF ํ ํฐ ์์ฑ"""
return secrets.token_hex(16)
@app.middleware('request')
async def csrf_protection(request: Request):
"""CSRF ๋ณดํธ ๋ฏธ๋ค์จ์ด"""
# CSRF ๋ณดํธ๊ฐ ํ์ ์๋ ๋ฉ์๋ ๋๋ ๊ฒฝ๋ก ์ ์ธ
safe_methods = ['GET', 'HEAD', 'OPTIONS']
bypass_paths = ['/api/token', '/login']
if request.method in safe_methods or request.path in bypass_paths:
return
# CSRF ํ ํฐ ํ์ธ
request_csrf = request.headers.get('X-CSRF-Token')
session_id = request.cookies.get('session_id')
if not session_id or not request_csrf or CSRF_TOKENS.get(session_id) != request_csrf:
logger.warning(f"CSRF ํ ํฐ ๋ถ์ผ์น: {request.path}")
raise Forbidden("์ ํจํ์ง ์์ CSRF ํ ํฐ")
####################################################
# ์ฌ์ฉ์ ์ธ์ฆ ๋ผ์ฐํธ
####################################################
@app.route('/api/token', methods=['POST'])
async def get_token(request: Request) -> HTTPResponse:
"""
์ฌ์ฉ์ ์ธ์ฆ ๋ฐ ํ ํฐ ๋ฐ๊ธ
Request:
JSON: {username, password}
Response:
JSON: {token, expires_in}
"""
data = request.json
username = data.get('username')
password = data.get('password')
if not username or not password:
return json({'error': 'missing_fields', 'message': '์ฌ์ฉ์๋ช
๊ณผ ๋น๋ฐ๋ฒํธ๊ฐ ํ์ํฉ๋๋ค'}, status=400)
# ๋น๋ฐ๋ฒํธ ํด์ฑ
hashed_password = hashlib.sha256(password.encode()).hexdigest()
# ์ฌ์ฉ์ ํ์ธ (์ค์ ๋ก๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์กฐํ)
user = None
for u in USERS.values():
if u['username'] == username and u['password_hash'] == hashed_password:
user = u
break
if user:
# ํ ํฐ ์์ฑ
token = generate_token(user['id'], user['role'])
# ์ธ์
์ค์ ๋ฐ CSRF ํ ํฐ ์์ฑ
session_id = str(uuid.uuid4())
csrf_token = generate_csrf_token()
CSRF_TOKENS[session_id] = csrf_token
response = json({
'token': token,
'csrf_token': csrf_token,
'expires_in': 3600,
'user_id': user['id'],
'role': user['role']
})
# ์ธ์
์ฟ ํค ์ค์
response.cookies['session_id'] = session_id
response.cookies['session_id']['httponly'] = True
response.cookies['session_id']['secure'] = True # ํ๋ก๋์
ํ๊ฒฝ HTTPS
response.cookies['session_id']['samesite'] = 'Lax'
return response
else:
logger.warning(f"๋ก๊ทธ์ธ ์คํจ: {username}")
return json({'error': 'invalid_credentials', 'message': '์๋ชป๋ ์ฌ์ฉ์๋ช
๋๋ ๋น๋ฐ๋ฒํธ'}, status=401)
@app.route('/api/logout', methods=['POST'])
@protected()
async def logout(request: Request) -> HTTPResponse:
"""
๋ก๊ทธ์์ ์ฒ๋ฆฌ
- ํ์ฌ ํ ํฐ์ ๋ฌดํจํ
"""
# ํ์ฌ ํ ํฐ ์ถ์ถ
token = request.headers.get('Authorization').split()[1]
# ํ ํฐ ์ฐจ๋จ ๋ชฉ๋ก์ ์ถ๊ฐ
BLACKLISTED_TOKENS.add(token)
# ์ธ์
ID๊ฐ ์๋ ๊ฒฝ์ฐ CSRF ํ ํฐ ์ ๊ฑฐ
session_id = request.cookies.get('session_id')
if session_id and session_id in CSRF_TOKENS:
del CSRF_TOKENS[session_id]
# ์ฟ ํค ํด๋ฆฌ์ด ์๋ต
response = json({'status': 'logged_out'})
response.cookies['session_id'] = ''
response.cookies['session_id']['expires'] = 0
return response
####################################################
# ๋ณดํธ๋ ๋ผ์ฐํธ ์์
####################################################
@app.route('/api/profile')
@protected()
async def get_profile(request: Request) -> HTTPResponse:
"""
์ฌ์ฉ์ ํ๋กํ ์ ๋ณด ์กฐํ (๋ชจ๋ ์ธ์ฆ๋ ์ฌ์ฉ์)
"""
user_id = request.ctx.user_id
user = USERS.get(user_id)
if not user:
return json({'error': 'user_not_found'}, status=404)
# ๋ฏผ๊ฐํ ์ ๋ณด ์ ์ธ
return json({
'id': user['id'],
'username': user['username'],
'role': user['role']
})
@app.route('/api/admin/users')
@protected(['admin'])
async def list_users(request: Request) -> HTTPResponse:
"""
๋ชจ๋ ์ฌ์ฉ์ ๋ชฉ๋ก ์กฐํ (๊ด๋ฆฌ์๋ง)
"""
# ์ฌ์ฉ์ ๋ชฉ๋ก ๋ฐํ (๋น๋ฐ๋ฒํธ ํด์ ์ ์ธ)
users_list = [
{'id': u['id'], 'username': u['username'], 'role': u['role']}
for u in USERS.values()
]
return json({'users': users_list})
# ๊ธฐ๋ณธ ์ค๋ฅ ํธ๋ค๋ฌ
@app.exception(Unauthorized)
async def unauthorized(request: Request, exception: Unauthorized) -> HTTPResponse:
return json({
'error': 'unauthorized',
'message': str(exception)
}, status=401)
@app.exception(Forbidden)
async def forbidden(request: Request, exception: Forbidden) -> HTTPResponse:
return json({
'error': 'forbidden',
'message': str(exception)
}, status=403)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000, debug=True)
โ
ํน์ง:
- JWT ๊ธฐ๋ฐ ์ธ์ฆ ์์คํ ๊ตฌํ
- ์ญํ ๊ธฐ๋ฐ ์ ๊ทผ ์ ์ด(RBAC) ๊ตฌํ
- ๋ฐ์ฝ๋ ์ดํฐ ํจํด์ ํตํ ๊ฒฝ๋ก ๋ณดํธ
- CSRF ํ ํฐ ๋ฐฉ์ด ๋ฉ์ปค๋์ฆ
- ๋ณด์ ๊ฐํ HTTP ํค๋ ์ถ๊ฐ
- ํ ํฐ ์ฐจ๋จ ๋ชฉ๋ก ์ง์
- ์ธ์ ๊ด๋ฆฌ ๋ฐ ์์ ํ ์ฟ ํค ์ฌ์ฉ
- ๋น๋ฐ๋ฒํธ ํด์ ์ฒ๋ฆฌ
- ์ฌ์ฉ์ ์ธ์ฆ ๋ฐ ๋ก๊ทธ์์ ์ํฌํ๋ก์ฐ
- ์์ธํ ๋ณด์ ๋ก๊น
- ์ค๋ฅ ์ฒ๋ฆฌ ๋ฐ ํ์คํ๋ ์๋ต
- ๋น๋๊ธฐ ์ธ์ฆ ์ฒ๋ฆฌ
- ํ๊ฒฝ ์ค์ ๋ถ๋ฆฌ (๋น๋ฐํค ๊ด๋ฆฌ)
โ ๋ชจ๋ฒ ์ฌ๋ก:
- ๋น๋๊ธฐ ์ฝ๋ ์ต์ ํ
- ์๋ฌ ์ฒ๋ฆฌ ๊ตฌํ
- ๋ฏธ๋ค์จ์ด ํ์ฉ
- ์บ์ฑ ์ ๋ต
- ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ต์ ํ
- ๋ณด์ ์ค์
- ๋ก๊น ๊ตฌํ
- ํ ์คํธ ์์ฑ