KR_WebSocket - somaz94/python-study GitHub Wiki
WebSocket์ ํด๋ผ์ด์ธํธ์ ์๋ฒ ๊ฐ์ ์๋ฐฉํฅ ์ค์๊ฐ ํต์ ์ ์ ๊ณตํ๋ ํ๋กํ ์ฝ์ ๋๋ค.
import websockets
import asyncio
# ๊ธฐ๋ณธ WebSocket ์๋ฒ
async def echo(websocket, path):
async for message in websocket:
await websocket.send(f"Echo: {message}")
# ์๋ฒ ์์
async def start_server():
server = await websockets.serve(echo, "localhost", 8765)
await server.wait_closed()
# ์คํ
asyncio.run(start_server())
# ๊ธฐ๋ณธ WebSocket ํด๋ผ์ด์ธํธ
async def connect_to_server():
async with websockets.connect("ws://localhost:8765") as websocket:
await websocket.send("Hello!")
response = await websocket.recv()
print(response)
โ ํน์ง:
- ์๋ฐฉํฅ ํต์
- ๋น๋๊ธฐ ์ฒ๋ฆฌ
- ์ค์๊ฐ ๋ฐ์ดํฐ ์ ์ก
class WebSocketServer:
def __init__(self):
self.clients = set()
async def register(self, websocket):
self.clients.add(websocket)
try:
await self.handle_client(websocket)
finally:
self.clients.remove(websocket)
async def handle_client(self, websocket):
try:
async for message in websocket:
await self.broadcast(message)
except websockets.exceptions.ConnectionClosed:
pass
async def broadcast(self, message):
if self.clients:
await asyncio.gather(
*[client.send(message) for client in self.clients]
)
โ ํน์ง:
- ํด๋ผ์ด์ธํธ ๊ด๋ฆฌ
- ๋ฉ์์ง ๋ธ๋ก๋์บ์คํธ
- ์ฐ๊ฒฐ ๊ด๋ฆฌ
class WebSocketClient:
def __init__(self, uri):
self.uri = uri
self.websocket = None
self.retry_count = 0
self.max_retries = 5
async def connect(self):
while self.retry_count < self.max_retries:
try:
self.websocket = await websockets.connect(self.uri)
self.retry_count = 0
return True
except websockets.exceptions.ConnectionClosed:
self.retry_count += 1
await asyncio.sleep(2 ** self.retry_count)
return False
โ ํน์ง:
- ์๋ ์ฌ์ฐ๊ฒฐ
- ์๋ฌ ์ฒ๋ฆฌ
- ์ง์ ๋ฐฑ์คํ
import json
class WebSocketProtocol:
def __init__(self):
self.handlers = {}
def register_handler(self, event_type, handler):
self.handlers[event_type] = handler
async def handle_message(self, websocket, message):
try:
data = json.loads(message)
event_type = data.get('type')
payload = data.get('payload')
if event_type in self.handlers:
response = await self.handlers[event_type](payload)
await websocket.send(json.dumps({
'type': f"{event_type}_response",
'payload': response
}))
except json.JSONDecodeError:
await websocket.send(json.dumps({
'type': 'error',
'payload': '์๋ชป๋ JSON ํ์'
}))
โ ํน์ง:
- ์ด๋ฒคํธ ๊ธฐ๋ฐ ์ฒ๋ฆฌ
- JSON ๋ฉ์์ง ํฌ๋งท
- ์๋ฌ ํธ๋ค๋ง
import jwt
from datetime import datetime, timedelta
class SecureWebSocket:
def __init__(self, secret_key):
self.secret_key = secret_key
def create_token(self, user_id):
payload = {
'user_id': user_id,
'exp': datetime.utcnow() + timedelta(hours=1)
}
return jwt.encode(payload, self.secret_key, algorithm='HS256')
async def authenticate_connection(self, websocket, path):
try:
token = await websocket.recv()
user_id = self.verify_token(token)
return user_id
except ValueError as e:
await websocket.close()
return None
def verify_token(self, token):
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
return payload['user_id']
except jwt.PyJWTError:
raise ValueError("์ ํจํ์ง ์์ ํ ํฐ")
โ ํน์ง:
- JWT ์ธ์ฆ
- ํ ํฐ ๊ฒ์ฆ
- ๋ณด์ ์ฐ๊ฒฐ
import asyncio
import orjson # orjson์ ๊ธฐ๋ณธ json๋ณด๋ค ๋น ๋ฅธ ์ฒ๋ฆฌ ์ ๊ณต
import zlib
from concurrent.futures import ProcessPoolExecutor
class OptimizedWebSocket:
def __init__(self):
self.clients = set()
self.process_pool = ProcessPoolExecutor(max_workers=4)
self.message_queue = asyncio.Queue(maxsize=1000)
async def register(self, websocket):
self.clients.add(websocket)
try:
await self.handle_client(websocket)
finally:
self.clients.remove(websocket)
async def handle_client(self, websocket):
# ๋ฉ์์ง ์ฒ๋ฆฌ ๋ฃจํ
try:
async for message in websocket:
# ์์ถ๋ ๋ฉ์์ง ์ฒ๋ฆฌ
if websocket.extensions and 'permessage-deflate' in websocket.extensions:
message = zlib.decompress(message)
# ๋ฉ์์ง ํ์ ์ถ๊ฐ
await self.message_queue.put((websocket, message))
except websockets.exceptions.ConnectionClosed:
pass
async def process_message_queue(self):
"""๋ฉ์์ง ํ ์ฒ๋ฆฌ ๋ฃจํ"""
while True:
websocket, message = await self.message_queue.get()
try:
# ๋ฌด๊ฑฐ์ด ์ฒ๋ฆฌ๋ ํ๋ก์ธ์ค ํ์์ ์คํ
data = await asyncio.get_event_loop().run_in_executor(
self.process_pool,
self.process_message,
message
)
# ๊ฒฐ๊ณผ ์ ์ก
await websocket.send(orjson.dumps(data))
except Exception as e:
print(f"๋ฉ์์ง ์ฒ๋ฆฌ ์ค๋ฅ: {str(e)}")
finally:
self.message_queue.task_done()
def process_message(self, message):
"""CPU ์ง์ฝ์ ์ธ ๋ฉ์์ง ์ฒ๋ฆฌ (๋ณ๋ ํ๋ก์ธ์ค์์ ์คํ)"""
# ์ค์ ๊ตฌํ์์๋ ๋ณต์กํ ์ฐ์ฐ ์ํ
return {"status": "processed", "original": message}
async def start_server(self, host='localhost', port=8765):
"""์๋ฒ ์์"""
# ๋ฉ์์ง ํ ์ฒ๋ฆฌ ํ์คํฌ ์์
asyncio.create_task(self.process_message_queue())
# ์๋ฒ ์์
server = await websockets.serve(
self.register,
host,
port,
compression=None, # ํด๋ผ์ด์ธํธ ์ง์ ์ ์๋์ผ๋ก ์์ถ ํ์ฑํ
max_size=10 * 1024 * 1024, # ์ต๋ ๋ฉ์์ง ํฌ๊ธฐ (10MB)
max_queue=64 # ๋ด๋ถ ๋ฉ์์ง ํ ํฌ๊ธฐ
)
return server
โ ํน์ง:
- ๋ฉ์์ง ํ๋ฅผ ํตํ ๋ฐฑํ๋ ์ ์ ์ด
- ํ๋ก์ธ์ค ํ์ ํ์ฉํ CPU ๋ฐ์ด๋ ์์ ์ฒ๋ฆฌ
- ์์ถ์ ํตํ ๋ฐ์ดํฐ ์ ์ก๋ ๊ฐ์
- orjson ํ์ฉ์ผ๋ก JSON ์ง๋ ฌํ/์ญ์ง๋ ฌํ ์ฑ๋ฅ ํฅ์
- ๋ฉ์์ง ํฌ๊ธฐ ๋ฐ ํ ์ ํ์ผ๋ก ๋ฆฌ์์ค ๋ณดํธ
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from starlette.websockets import WebSocketState
import asyncio
import json
import logging
# FastAPI ์ ํ๋ฆฌ์ผ์ด์
์ค์
app = FastAPI(title="WebSocket API")
# CORS ์ค์
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # ์ค์ ํ๊ฒฝ์์๋ ํน์ ์ถ์ฒ๋ง ํ์ฉ
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ๋ก๊น
์ค์
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ์ฐ๊ฒฐ ๊ด๋ฆฌ์
class ConnectionManager:
def __init__(self):
self.active_connections: dict = {}
self.lock = asyncio.Lock()
async def connect(self, websocket: WebSocket, client_id: str):
await websocket.accept()
async with self.lock:
self.active_connections[client_id] = websocket
logger.info(f"ํด๋ผ์ด์ธํธ ์ฐ๊ฒฐ: {client_id}")
async def disconnect(self, client_id: str):
async with self.lock:
if client_id in self.active_connections:
del self.active_connections[client_id]
logger.info(f"ํด๋ผ์ด์ธํธ ์ฐ๊ฒฐ ํด์ : {client_id}")
async def send_personal_message(self, message: str, client_id: str):
async with self.lock:
if client_id in self.active_connections:
websocket = self.active_connections[client_id]
if websocket.client_state == WebSocketState.CONNECTED:
await websocket.send_text(message)
return True
return False
async def broadcast(self, message: str, exclude: str = None):
tasks = []
async with self.lock:
for client_id, websocket in self.active_connections.items():
if exclude != client_id and websocket.client_state == WebSocketState.CONNECTED:
tasks.append(websocket.send_text(message))
if tasks:
await asyncio.gather(*tasks)
# ์ฐ๊ฒฐ ๊ด๋ฆฌ์ ์ธ์คํด์ค ์์ฑ
manager = ConnectionManager()
# WebSocket ์๋ํฌ์ธํธ
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket, client_id)
try:
while True:
# ๋ฉ์์ง ์์
data = await websocket.receive_text()
# ๋ฉ์์ง ํ์ฑ
try:
message_data = json.loads(data)
message_type = message_data.get("type")
# ๋ฉ์์ง ์ ํ์ ๋ฐ๋ฅธ ์ฒ๋ฆฌ
if message_type == "chat":
# ์ฑํ
๋ฉ์์ง ๋ธ๋ก๋์บ์คํธ
formatted_message = json.dumps({
"type": "chat",
"sender": client_id,
"content": message_data.get("content", ""),
"timestamp": asyncio.get_event_loop().time()
})
await manager.broadcast(formatted_message)
elif message_type == "private":
# ๊ฐ์ธ ๋ฉ์์ง
target_id = message_data.get("target")
if target_id:
formatted_message = json.dumps({
"type": "private",
"sender": client_id,
"content": message_data.get("content", ""),
"timestamp": asyncio.get_event_loop().time()
})
success = await manager.send_personal_message(formatted_message, target_id)
# ์ ์ก ๊ฒฐ๊ณผ ์๋ฆผ
await websocket.send_text(json.dumps({
"type": "status",
"status": "delivered" if success else "failed",
"target": target_id
}))
except json.JSONDecodeError:
await websocket.send_text(json.dumps({
"type": "error",
"message": "์๋ชป๋ JSON ํ์"
}))
except WebSocketDisconnect:
await manager.disconnect(client_id)
# ์ฐ๊ฒฐ ํด์ ์๋ฆผ ๋ธ๋ก๋์บ์คํธ
await manager.broadcast(json.dumps({
"type": "system",
"content": f"ํด๋ผ์ด์ธํธ {client_id}๊ฐ ๋๊ฐ์ต๋๋ค"
}))
# ์ผ๋ฐ HTTP ์๋ํฌ์ธํธ
@app.get("/")
async def get_root():
return {"message": "WebSocket ์๋ฒ๊ฐ ์คํ ์ค์
๋๋ค. '/ws/{client_id}'๋ก ์ฐ๊ฒฐํ์ธ์."}
# ์คํ ์คํฌ๋ฆฝํธ
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
โ ํน์ง:
- FastAPI์ WebSocket ํตํฉ
- ํด๋ผ์ด์ธํธ ID๋ณ ์ฐ๊ฒฐ ๊ด๋ฆฌ
- ๋ฉ์์ง ํ์ ์ ๋ฐ๋ฅธ ๋ผ์ฐํ
- ๊ฐ์ธ ๋ฉ์์ง ๋ฐ ๋ธ๋ก๋์บ์คํธ ์ง์
- ๋์์ฑ ์์ ํ ์ฐ๊ฒฐ ๊ด๋ฆฌ (Lock ์ฌ์ฉ)
- CORS ๋ฏธ๋ค์จ์ด ์ค์
- ๊ตฌ์กฐํ๋ ๋ก๊น
- Starlette WebSocket ์ํ ํ์ธ
# ์ค์๊ฐ ์ฃผ์ ์์ธ ์
๋ฐ์ดํธ ์์คํ
import asyncio
import json
import random
import logging
import websockets
from datetime import datetime
from typing import Dict, List, Set, Any
# ๋ก๊น
์ค์
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class StockTickerServer:
def __init__(self):
# ํด๋ผ์ด์ธํธ ๊ด๋ฆฌ
self.clients: Set[websockets.WebSocketServerProtocol] = set()
# ํด๋ผ์ด์ธํธ ๊ตฌ๋
์ ๋ณด
self.subscriptions: Dict[str, Set[websockets.WebSocketServerProtocol]] = {}
# ์ฃผ์ ์์ธ ๋ฐ์ดํฐ
self.stock_data: Dict[str, Dict[str, Any]] = {
"AAPL": {"price": 150.0, "change": 0.0, "volume": 0},
"MSFT": {"price": 250.0, "change": 0.0, "volume": 0},
"GOOGL": {"price": 2800.0, "change": 0.0, "volume": 0},
"AMZN": {"price": 3300.0, "change": 0.0, "volume": 0},
"META": {"price": 330.0, "change": 0.0, "volume": 0},
}
async def register(self, websocket: websockets.WebSocketServerProtocol):
"""์ ํด๋ผ์ด์ธํธ ๋ฑ๋ก"""
self.clients.add(websocket)
logger.info(f"ํด๋ผ์ด์ธํธ ์ฐ๊ฒฐ: {len(self.clients)}๊ฐ ์ฐ๊ฒฐ ํ์ฑํ")
async def unregister(self, websocket: websockets.WebSocketServerProtocol):
"""ํด๋ผ์ด์ธํธ ๋ฑ๋ก ํด์ """
self.clients.discard(websocket)
# ๊ตฌ๋
์์๋ ์ ๊ฑฐ
for symbol in list(self.subscriptions.keys()):
if websocket in self.subscriptions[symbol]:
self.subscriptions[symbol].discard(websocket)
# ๋น์ด์๋ ๊ตฌ๋
์ ๋ฆฌ
if not self.subscriptions[symbol]:
del self.subscriptions[symbol]
logger.info(f"ํด๋ผ์ด์ธํธ ์ฐ๊ฒฐ ํด์ : {len(self.clients)}๊ฐ ์ฐ๊ฒฐ ํ์ฑํ")
async def process_message(self, websocket: websockets.WebSocketServerProtocol, message: str):
"""ํด๋ผ์ด์ธํธ ๋ฉ์์ง ์ฒ๋ฆฌ"""
try:
data = json.loads(message)
action = data.get("action")
if action == "subscribe":
# ์ฃผ์ ๊ตฌ๋
symbols = data.get("symbols", [])
for symbol in symbols:
if symbol in self.stock_data:
if symbol not in self.subscriptions:
self.subscriptions[symbol] = set()
self.subscriptions[symbol].add(websocket)
# ํ์ฌ ์์ธ ์ฆ์ ์ ์ก
await websocket.send(json.dumps({
"type": "stock_update",
"symbol": symbol,
"data": self.stock_data[symbol],
"timestamp": datetime.now().isoformat()
}))
await websocket.send(json.dumps({
"type": "subscription_success",
"symbols": symbols
}))
elif action == "unsubscribe":
# ๊ตฌ๋
ํด์
symbols = data.get("symbols", [])
for symbol in symbols:
if symbol in self.subscriptions and websocket in self.subscriptions[symbol]:
self.subscriptions[symbol].discard(websocket)
await websocket.send(json.dumps({
"type": "unsubscription_success",
"symbols": symbols
}))
elif action == "get_available_stocks":
# ์ฌ์ฉ ๊ฐ๋ฅํ ์ฃผ์ ๋ชฉ๋ก ์์ฒญ
await websocket.send(json.dumps({
"type": "available_stocks",
"symbols": list(self.stock_data.keys())
}))
except json.JSONDecodeError:
await websocket.send(json.dumps({
"type": "error",
"message": "์๋ชป๋ JSON ํ์"
}))
except Exception as e:
logger.error(f"๋ฉ์์ง ์ฒ๋ฆฌ ์ค๋ฅ: {str(e)}")
await websocket.send(json.dumps({
"type": "error",
"message": f"์๋ฒ ์ค๋ฅ: {str(e)}"
}))
async def handle_client(self, websocket: websockets.WebSocketServerProtocol):
"""ํด๋ผ์ด์ธํธ ์ฐ๊ฒฐ ์ฒ๋ฆฌ"""
await self.register(websocket)
try:
async for message in websocket:
await self.process_message(websocket, message)
except websockets.exceptions.ConnectionClosedError:
pass
finally:
await self.unregister(websocket)
async def update_stocks(self):
"""์ฃผ์ ์์ธ ์
๋ฐ์ดํธ ์์ฑ"""
while True:
# ๋ชจ๋ ์ฃผ์ ์์ธ ์
๋ฐ์ดํธ
for symbol in self.stock_data:
# ๋๋ค ๊ฐ๊ฒฉ ๋ณ๋
change_percent = (random.random() - 0.5) * 2.0 # -1.0% ~ +1.0%
price = self.stock_data[symbol]["price"]
change = price * change_percent / 100.0
# ๋ฐ์ดํฐ ์
๋ฐ์ดํธ
self.stock_data[symbol] = {
"price": round(price + change, 2),
"change": round(change, 2),
"change_percent": round(change_percent, 2),
"volume": random.randint(100, 10000),
"last_update": datetime.now().isoformat()
}
# ํด๋น ์ฃผ์์ ๊ตฌ๋
ํ ํด๋ผ์ด์ธํธ์๊ฒ ์
๋ฐ์ดํธ ์ ์ก
if symbol in self.subscriptions:
update_message = json.dumps({
"type": "stock_update",
"symbol": symbol,
"data": self.stock_data[symbol]
})
for client in self.subscriptions[symbol].copy():
try:
await client.send(update_message)
except websockets.exceptions.ConnectionClosed:
# ๋ซํ ์ฐ๊ฒฐ ์ฒ๋ฆฌ๋ handle_client์์ ์ํ
pass
# ์
๋ฐ์ดํธ ์ฃผ๊ธฐ (1์ด)
await asyncio.sleep(1)
async def start_server(self, host: str = 'localhost', port: int = 8765):
"""์๋ฒ ์์"""
# ์ฃผ์ ์
๋ฐ์ดํธ ํ์คํฌ ์์
asyncio.create_task(self.update_stocks())
# WebSocket ์๋ฒ ์์
server = await websockets.serve(
self.handle_client,
host,
port,
ping_interval=20,
ping_timeout=60
)
logger.info(f"์ฃผ์ ์์ธ ์๋ฒ ์์: {host}:{port}")
return server
# ์๋ฒ ์คํ ์ฝ๋
async def main():
server = StockTickerServer()
await server.start_server()
await asyncio.Future() # ๋ฌดํ ์คํ
if __name__ == "__main__":
asyncio.run(main())
โ ํน์ง:
- ์ค์๊ฐ ๋ฐ์ดํฐ ์คํธ๋ฆฌ๋ฐ
- ๊ตฌ๋ ๊ธฐ๋ฐ ์ํคํ ์ฒ
- ํด๋ผ์ด์ธํธ๋ณ ๊ด์ฌ ๋ฐ์ดํฐ ํํฐ๋ง
- ์ฃผ๊ธฐ์ ๋ฐ์ดํฐ ์ ๋ฐ์ดํธ
- ์ฐ๊ฒฐ ์ํ ๋ชจ๋ํฐ๋ง
- ์๋ฌ ์ฒ๋ฆฌ ๋ฐ ๋ณต๊ตฌ
- ํ์ ํํ
- ๋ก๊น ์์คํ
- ํ/ํ ๋ฉ์ปค๋์ฆ์ผ๋ก ์ฐ๊ฒฐ ์ ์ง
โ ๋ชจ๋ฒ ์ฌ๋ก:
- ์ฐ๊ฒฐ ์ํ ๊ด๋ฆฌ
- ์ฌ์ฐ๊ฒฐ ๋ก์ง ๊ตฌํ
- ๋ฉ์์ง ์ง๋ ฌํ
- ์๋ฌ ์ฒ๋ฆฌ
- ๋ณด์ ๊ณ ๋ ค
- ์ฑ๋ฅ ์ต์ ํ
- ๋ก๊น ๊ตฌํ
- ํ ์คํธ ์์ฑ
- ๊ตฌ๋ ๊ธฐ๋ฐ ์ํคํ ์ฒ
- ๋ฐฑํ๋ ์ ์ฒ๋ฆฌ
- ๋น๋๊ธฐ I/O์ ๋์์ฑ ํจํด ํ์ฉ
- ๋ชจ๋ํฐ๋ง๊ณผ ๋๋ฒ๊น ์์คํ ๊ตฌ์ถ
- ํ์ฅ ๊ฐ๋ฅํ ๊ตฌ์กฐ ์ค๊ณ
- ๋ฉ์์ง ๋ฒ์ ๊ด๋ฆฌ
- ํด๋ผ์ด์ธํธ ๋ผ์ด๋ธ๋ฌ๋ฆฌ ์ ๊ณต