KR_aiohttp - somaz94/python-study GitHub Wiki
aiohttp๋ Python์์ ๋น๋๊ธฐ HTTP ํด๋ผ์ด์ธํธ ๋ฐ ์๋ฒ๋ฅผ ๊ตฌํํ๊ธฐ ์ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ด๋ค. asyncio๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํ๋ฉฐ, ๊ณ ์ฑ๋ฅ ๋น๋๊ธฐ ์์ฒญ ์ฒ๋ฆฌ๋ฅผ ๊ฐ๋ฅํ๊ฒ ํ๋ค.
- ๋น๋๊ธฐ HTTP ํด๋ผ์ด์ธํธ ๋ฐ ์๋ฒ ์ง์
- WebSocket ์ง์
- ๋ฏธ๋ค์จ์ด ๊ธฐ๋ฅ ์ ๊ณต
- ํ๋ฌ๊ทธ์ธ ์์คํ
- ์ ํธ ์ฒ๋ฆฌ ๋ฉ์ปค๋์ฆ
- ๋ผ์ฐํ
์์คํ
pip install aiohttp
import asyncio
from aiohttp import web
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = f"์๋
ํ์ธ์, {name}๋!"
return web.Response(text=text)
async def main():
app = web.Application()
app.add_routes([
web.get('/', handle),
web.get('/{name}', handle)
])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
print("์๋ฒ ์์: http://localhost:8080")
# ์๋ฒ๊ฐ ๊ณ์ ์คํ๋๋๋ก ์ ์ง
await asyncio.Event().wait()
if __name__ == '__main__':
asyncio.run(main())
aiohttp๋ URL ๋ผ์ฐํ
์ ์ํ ๋ค์ํ ๋ฐฉ๋ฒ์ ์ ๊ณตํ๋ค.
app.add_routes([
web.get('/users', get_users),
web.post('/users', create_user),
web.get('/users/{id}', get_user),
web.put('/users/{id}', update_user),
web.delete('/users/{id}', delete_user),
])
# ๋๋ ๋ฐ์ฝ๋ ์ดํฐ ์ฌ์ฉ
routes = web.RouteTableDef()
@routes.get('/')
async def hello(request):
return web.Response(text="Hello, world")
@routes.view('/users/{id}')
class UserView(web.View):
async def get(self):
return web.Response(text=f"์ฌ์ฉ์ ์ ๋ณด ์กฐํ")
async def put(self):
return web.Response(text=f"์ฌ์ฉ์ ์ ๋ณด ์
๋ฐ์ดํธ")
app.add_routes(routes)
aiohttp๋ ๋ค์ํ ํ์์ ์์ฒญ๊ณผ ์๋ต์ ์ฒ๋ฆฌํ ์ ์๋ค.
async def handle_json(request):
# JSON ์์ฒญ ์ฒ๋ฆฌ
data = await request.json()
# JSON ์๋ต ๋ฐํ
return web.json_response({
'status': 'success',
'data': data
})
async def handle_form(request):
# ํผ ๋ฐ์ดํฐ ์ฒ๋ฆฌ
data = await request.post()
name = data.get('name')
# ํ
ํ๋ฆฟ ์๋ต
return web.Response(
text=f"<html><body><h1>์๋
ํ์ธ์, {name}๋!</h1></body></html>",
content_type='text/html'
)
async def handle_file(request):
# ํ์ผ ์๋ต
return web.FileResponse('/path/to/file.pdf')
โ
ํน์ง:
- ๋น๋๊ธฐ ํ๋ก๊ทธ๋๋ฐ ํจํด
- ์ ์ฐํ ๋ผ์ฐํ ์์คํ
- ๋ค์ํ ์๋ต ํ์ ์ง์
- ๋ฏธ๋ค์จ์ด ์ฒด์ธ ๊ตฌ์ฑ
aiohttp๋ ๋น๋๊ธฐ HTTP ํด๋ผ์ด์ธํธ ๊ธฐ๋ฅ์ ์ ๊ณตํ์ฌ ์ฌ๋ฌ ์น ์์ฒญ์ ๋์์ ์ฒ๋ฆฌํ ์ ์๊ฒ ํ๋ค.
import aiohttp
import asyncio
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
return await response.text()
return None
async def main():
result = await fetch('https://python.org')
print(f"๊ฒฐ๊ณผ ๊ธธ์ด: {len(result) if result else 'N/A'}")
asyncio.run(main())
aiohttp ํด๋ผ์ด์ธํธ๋ ๋ชจ๋ ํ์ค HTTP ๋ฉ์๋๋ฅผ ์ง์ํ๋ค.
async def main():
async with aiohttp.ClientSession() as session:
# GET ์์ฒญ
async with session.get('https://api.example.com/users') as resp:
users = await resp.json()
# POST ์์ฒญ
user_data = {'name': 'ํ๊ธธ๋', 'email': '[email protected]'}
async with session.post('https://api.example.com/users', json=user_data) as resp:
result = await resp.json()
# PUT ์์ฒญ
update_data = {'name': '๊น๊ธธ๋'}
async with session.put('https://api.example.com/users/1', json=update_data) as resp:
updated = await resp.json()
# DELETE ์์ฒญ
async with session.delete('https://api.example.com/users/1') as resp:
if resp.status == 204:
print("์ฌ์ฉ์๊ฐ ์ญ์ ๋์์ต๋๋ค")
์ฌ๋ฌ ์์ฒญ์ ๋์์ ์ฒ๋ฆฌํ๋ ๊ฒ์ aiohttp์ ํต์ฌ ๊ธฐ๋ฅ์ด๋ค.
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
tasks.append(fetch_url(session, url))
results = await asyncio.gather(*tasks)
return results
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'https://python.org',
'https://github.com',
'https://stackoverflow.com'
]
results = await fetch_all(urls)
for url, html in zip(urls, results):
print(f"{url}: {len(html)} ๋ฐ์ดํธ")
asyncio.run(main())
async with session.get('https://slow-api.example.com', timeout=aiohttp.ClientTimeout(total=5)) as resp:
# 5์ด ์ด๋ด์ ์๋ต์ด ์ค์ง ์์ผ๋ฉด ์์ธ ๋ฐ์
data = await resp.json()
# ์ฟ ํค ์ ์ฅ ๋ฐ ์ ์ก
jar = aiohttp.CookieJar()
async with aiohttp.ClientSession(cookie_jar=jar) as session:
await session.get('https://auth.example.com/login')
# ์ด์ jar์ ์ธ์ฆ ์ฟ ํค๊ฐ ์ ์ฅ๋จ
# ๋์ผํ ์ธ์
์์ ์์ฒญํ๋ฉด ์ฟ ํค๊ฐ ์๋์ผ๋ก ์ ์ก๋จ
await session.get('https://auth.example.com/profile')
headers = {
'User-Agent': 'My Custom User Agent',
'Authorization': 'Bearer token123',
'Content-Type': 'application/json'
}
async with session.get('https://api.example.com', headers=headers) as resp:
data = await resp.json()
async with session.get('https://example.com', proxy='http://proxy.example.com:8080') as resp:
data = await resp.text()
# SSL ๊ฒ์ฆ ๋นํ์ฑํ (๋ณด์ ์ํ, ๊ฐ๋ฐ ํ๊ฒฝ์์๋ง ์ฌ์ฉ)
ssl_context = False
# ๋๋ ์ฌ์ฉ์ ์ ์ ์ธ์ฆ์ ์ฌ์ฉ
import ssl
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain('client.crt', 'client.key')
async with session.get('https://secure.example.com', ssl=ssl_context) as resp:
data = await resp.text()
โ
ํน์ง:
- ์ธ์ ๊ด๋ฆฌ ๋ฐ ์ฌ์ฌ์ฉ
- ๋น๋๊ธฐ ์์ฒญ ๋ณ๋ ฌ ์ฒ๋ฆฌ
- ํ์์์ ๋ฐ ์ฌ์๋ ์ค์
- ์์ฒญ/์๋ต ์ธํฐ์ ํฐ
- ๋ค์ํ ์ธ์ฆ ๋ฐฉ์ ์ง์
aiohttp๋ ์์ฒญ ์ฒ๋ฆฌ ์ ํ์ ๊ณตํต ๋ก์ง์ ์คํํ ์ ์๋ ๋ฏธ๋ค์จ์ด๋ฅผ ์ง์ํ๋ค.
@web.middleware
async def error_middleware(request, handler):
try:
response = await handler(request)
return response
except web.HTTPException as ex:
return web.json_response(
{'error': str(ex)},
status=ex.status
)
except Exception as ex:
return web.json_response(
{'error': 'Internal Server Error'},
status=500
)
async def handle_404(request):
return web.json_response(
{'error': 'Not Found'},
status=404
)
app = web.Application(middlewares=[error_middleware])
app.router.add_routes([
web.get('/', hello),
web.get('/users/{id}', get_user)
])
app.router.set_error_handler(404, handle_404)
โ
ํน์ง:
- ์ ์ญ ์๋ฌ ์ฒ๋ฆฌ
- ์ปค์คํ ๋ฏธ๋ค์จ์ด ์ง์
- ๋ผ์ฐํ ์๋ฌ ์ฒ๋ฆฌ
- ์์ฒญ/์๋ต ๋ณํ
- ์ธ์ฆ ๋ฐ ๊ถํ ์ ์ด
aiohttp ์ ํ๋ฆฌ์ผ์ด์
์์ ๋น๋๊ธฐ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ๊ทผ์ ๊ตฌํํ๋ ๋ฐฉ๋ฒ์ด๋ค.
import aiopg
from aiohttp import web
async def init_pg(app):
conf = app['config']['postgres']
engine = await aiopg.create_pool(
database=conf['database'],
user=conf['user'],
password=conf['password'],
host=conf['host'],
port=conf['port'],
minsize=1,
maxsize=5
)
app['db'] = engine
async def close_pg(app):
app['db'].close()
await app['db'].wait_closed()
class UserHandler:
async def get_user(self, request):
user_id = request.match_info['id']
async with request.app['db'].acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
'SELECT * FROM users WHERE id = %s',
(user_id,)
)
user = await cur.fetchone()
if not user:
raise web.HTTPNotFound()
return web.json_response({
'id': user[0],
'name': user[1],
'email': user[2]
})
app = web.Application()
app.on_startup.append(init_pg)
app.on_cleanup.append(close_pg)
โ
ํน์ง:
- ๋น๋๊ธฐ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ
- ์ปค๋ฅ์ ํ๋ง
- ์ ํ๋ฆฌ์ผ์ด์ ์๋ช ์ฃผ๊ธฐ ๊ด๋ฆฌ
- ํธ๋์ญ์ ์ฒ๋ฆฌ
- ๋งค๊ฐ๋ณ์ํ๋ ์ฟผ๋ฆฌ
aiohttp๋ ํด๋ผ์ด์ธํธ์ ์๋ฒ ์ธก ๋ชจ๋์์ WebSocket ์ฐ๊ฒฐ์ ์ง์ํ์ฌ ์ค์๊ฐ ์๋ฐฉํฅ ํต์ ์ ๊ฐ๋ฅํ๊ฒ ํ๋ค.
from aiohttp import web
import json
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
# ํด๋ผ์ด์ธํธ์๊ฒ ํ์ ๋ฉ์์ง ์ ์ก
await ws.send_str(json.dumps({'action': 'connect', 'status': 'success'}))
try:
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
data = json.loads(msg.data)
print(f"๋ฉ์์ง ์์ : {data}")
# ์์ฝ ์๋ต
await ws.send_str(json.dumps({
'action': 'echo',
'data': data
}))
elif msg.type == web.WSMsgType.ERROR:
print(f"WebSocket ์ค๋ฅ: {ws.exception()}")
finally:
# ์ฐ๊ฒฐ ์ข
๋ฃ ์ ์ ๋ฆฌ ์์
print("WebSocket ์ฐ๊ฒฐ ์ข
๋ฃ")
return ws
app = web.Application()
app.add_routes([web.get('/ws', websocket_handler)])
if __name__ == '__main__':
web.run_app(app)
import aiohttp
import asyncio
import json
async def websocket_client():
async with aiohttp.ClientSession() as session:
async with session.ws_connect('ws://localhost:8080/ws') as ws:
# ์๋ฒ๋ก๋ถํฐ ํ์ ๋ฉ์์ง ์์
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
print(f"์๋ฒ๋ก๋ถํฐ ์์ : {data}")
# ๋ฉ์์ง ์ ์ก
await ws.send_str(json.dumps({
'message': '์๋
ํ์ธ์, WebSocket!',
'timestamp': asyncio.get_event_loop().time()
}))
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
elif msg.type == aiohttp.WSMsgType.ERROR:
break
asyncio.run(websocket_client())
# server.py
from aiohttp import web
import json
# ์ฐ๊ฒฐ๋ ๋ชจ๋ ํด๋ผ์ด์ธํธ๋ฅผ ์ ์ฅ
connected_clients = []
async def chat_websocket(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
# ์ ํด๋ผ์ด์ธํธ ์ถ๊ฐ
connected_clients.append(ws)
client_id = len(connected_clients)
# ๋ชจ๋ ํด๋ผ์ด์ธํธ์๊ฒ ์ ์ฌ์ฉ์ ์ ์ ์๋ฆผ
for client in connected_clients:
if client != ws:
await client.send_str(json.dumps({
'action': 'user_joined',
'user_id': client_id
}))
try:
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
data = json.loads(msg.data)
# ์ฑํ
๋ฉ์์ง๋ฅผ ๋ชจ๋ ํด๋ผ์ด์ธํธ์๊ฒ ๋ธ๋ก๋์บ์คํธ
for client in connected_clients:
await client.send_str(json.dumps({
'action': 'chat',
'user_id': client_id,
'message': data.get('message', '')
}))
elif msg.type == web.WSMsgType.ERROR:
print(f"WebSocket ์ค๋ฅ: {ws.exception()}")
finally:
# ์ฐ๊ฒฐ ์ข
๋ฃ ์ ํด๋ผ์ด์ธํธ ์ ๊ฑฐ
connected_clients.remove(ws)
# ์ฌ์ฉ์ ํด์ฅ ์๋ฆผ
for client in connected_clients:
await client.send_str(json.dumps({
'action': 'user_left',
'user_id': client_id
}))
return ws
app = web.Application()
app.add_routes([web.get('/chat', chat_websocket)])
if __name__ == '__main__':
web.run_app(app)
WebSocket ์ฐ๊ฒฐ์ ๋ค์ํ ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌํ ์ ์๋ค.
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
# ์ฐ๊ฒฐ ์์ ์ด๋ฒคํธ
print("WebSocket ์ฐ๊ฒฐ ์์")
try:
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
# ํ
์คํธ ๋ฉ์์ง ์ด๋ฒคํธ
print(f"ํ
์คํธ ์์ : {msg.data}")
await ws.send_str(f"์์ฝ: {msg.data}")
elif msg.type == web.WSMsgType.BINARY:
# ๋ฐ์ด๋๋ฆฌ ๋ฉ์์ง ์ด๋ฒคํธ
print(f"๋ฐ์ด๋๋ฆฌ ๋ฐ์ดํฐ ์์ : {len(msg.data)} ๋ฐ์ดํธ")
await ws.send_bytes(msg.data)
elif msg.type == web.WSMsgType.PING:
# Ping ์ด๋ฒคํธ
await ws.pong()
elif msg.type == web.WSMsgType.CLOSE:
# ํด๋ผ์ด์ธํธ๊ฐ ์ฐ๊ฒฐ ์ข
๋ฃ ์์ฒญ
await ws.close()
elif msg.type == web.WSMsgType.ERROR:
# ์ค๋ฅ ์ด๋ฒคํธ
print(f"์ค๋ฅ ๋ฐ์: {ws.exception()}")
finally:
# ์ฐ๊ฒฐ ์ข
๋ฃ ์ด๋ฒคํธ
print("WebSocket ์ฐ๊ฒฐ ์ข
๋ฃ")
return ws
โ
ํน์ง:
- ์๋ฐฉํฅ ์ค์๊ฐ ํต์
- ์ด๋ฒคํธ ๊ธฐ๋ฐ ๋ฉ์์ง ๊ตํ
- JSON ์ง๋ ฌํ๋ ๋ฉ์์ง
- ํด๋ผ์ด์ธํธ ์ฐ๊ฒฐ ๊ด๋ฆฌ
- ๋ธ๋ก๋์บ์คํ ์ง์
aiohttp๋ฅผ ์ฌ์ฉํ ์ค์ ์ ํ๋ฆฌ์ผ์ด์
๊ตฌํ ์์ ๋ฅผ ์ดํด๋ณด์.
from aiohttp import web
import json
class RestAPI:
def __init__(self):
self.items = {}
async def get_items(self, request):
return web.json_response(list(self.items.values()))
async def get_item(self, request):
item_id = request.match_info['id']
if item_id not in self.items:
raise web.HTTPNotFound()
return web.json_response(self.items[item_id])
async def create_item(self, request):
data = await request.json()
item_id = str(len(self.items) + 1)
self.items[item_id] = {
'id': item_id,
**data
}
return web.json_response(
self.items[item_id],
status=201
)
async def update_item(self, request):
item_id = request.match_info['id']
if item_id not in self.items:
raise web.HTTPNotFound()
data = await request.json()
self.items[item_id].update(data)
return web.json_response(self.items[item_id])
api = RestAPI()
app.router.add_get('/items', api.get_items)
app.router.add_get('/items/{id}', api.get_item)
app.router.add_post('/items', api.create_item)
app.router.add_put('/items/{id}', api.update_item)
import os
async def handle_upload(request):
reader = await request.multipart()
# ํ์ผ ํ๋ ์ฒ๋ฆฌ
field = await reader.next()
filename = field.filename
# ํ์ผ ์ ์ฅ
size = 0
with open(os.path.join('uploads', filename), 'wb') as f:
while True:
chunk = await field.read_chunk()
if not chunk:
break
size += len(chunk)
f.write(chunk)
return web.json_response({
'filename': filename,
'size': size
})
app.router.add_post('/upload', handle_upload)
import aiohttp_session
from aiohttp_session import setup, get_session
from aiohttp_session.cookie_storage import EncryptedCookieStorage
import base64
async def init_app():
app = web.Application()
# ์ธ์
์ค์
secret_key = base64.urlsafe_b64decode(
'your-secret-key=='
)
setup(app, EncryptedCookieStorage(secret_key))
return app
async def login(request):
session = await get_session(request)
data = await request.json()
# ์ธ์ฆ ๋ก์ง
user_id = authenticate(data)
if user_id:
session['user_id'] = user_id
return web.json_response({'status': 'logged in'})
raise web.HTTPUnauthorized()
async def logout(request):
session = await get_session(request)
session.invalidate()
return web.json_response({'status': 'logged out'})
โ
ํน์ง:
- ๋น๋๊ธฐ ์ฝ๋ ์ต์ ํ
- ์ฐ๊ฒฐ ํ๋ง ํ์ฉ
- ์๋ฌ ์ฒ๋ฆฌ ๊ตฌํ
- ์ธ์ ๊ด๋ฆฌ
- ๋ณด์ ์ค์
- ๋ก๊น ๊ตฌํ
- ํ ์คํธ ์์ฑ
- ์ฑ๋ฅ ๋ชจ๋ํฐ๋ง
aiohttp๋ ์ ํ๋ฆฌ์ผ์ด์
ํ
์คํธ๋ฅผ ์ํ ๊ฐ๋ ฅํ ๋๊ตฌ๋ฅผ ์ ๊ณตํ๋ค.
aiohttp๋ ๋ด์ฅ๋ ํ
์คํธ ํด๋ผ์ด์ธํธ๋ฅผ ํตํด ์ ํ๋ฆฌ์ผ์ด์
์ ์์ฒญ์ ๋ณด๋ด๊ณ ์๋ต์ ๊ฒ์ฆํ ์ ์๋ค.
import pytest
from aiohttp import web
from aiohttp.test_utils import TestClient, TestServer
async def hello(request):
return web.Response(text="Hello, World!")
async def test_hello(aiohttp_client):
# ํ
์คํธํ ์ ํ๋ฆฌ์ผ์ด์
์์ฑ
app = web.Application()
app.router.add_get('/', hello)
# ํ
์คํธ ํด๋ผ์ด์ธํธ ์์ฑ
client = await aiohttp_client(app)
# ์์ฒญ ์ ์ก ๋ฐ ์๋ต ๊ฒ์ฆ
resp = await client.get('/')
assert resp.status == 200
text = await resp.text()
assert text == "Hello, World!"
pytest-aiohttp ํจํค์ง๋ฅผ ์ค์นํ๋ฉด ํ
์คํธ์ ์ฌ์ฉํ ์ ์๋ ์ ์ฉํ ํฝ์ค์ฒ๊ฐ ์ ๊ณต๋๋ค.
# pytest-aiohttp ํจํค์ง ์ค์น:
# pip install pytest-aiohttp
# conftest.py
import pytest
from aiohttp import web
# ํ
์คํธ์ฉ ์ ํ๋ฆฌ์ผ์ด์
์์ฑ
@pytest.fixture
def app():
app = web.Application()
async def hello(request):
return web.Response(text="Hello, World!")
async def echo(request):
data = await request.json()
return web.json_response(data)
app.router.add_get('/', hello)
app.router.add_post('/echo', echo)
return app
# test_app.py
async def test_hello(aiohttp_client, app):
client = await aiohttp_client(app)
resp = await client.get('/')
assert resp.status == 200
text = await resp.text()
assert text == "Hello, World!"
async def test_echo(aiohttp_client, app):
client = await aiohttp_client(app)
data = {"message": "Hello"}
resp = await client.post('/echo', json=data)
assert resp.status == 200
json_data = await resp.json()
assert json_data == data
์ธ๋ถ ์์กด์ฑ์ด ์๋ ์ ํ๋ฆฌ์ผ์ด์
์ ํ
์คํธํ๊ธฐ ์ํด ๋ชจํน ๊ธฐ๋ฒ์ ์ฌ์ฉํ๋ค.
import pytest
from unittest.mock import MagicMock, patch
from aiohttp import web
# ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ๊ทผ ํด๋์ค (์ค์ ์ฝ๋)
class Database:
async def get_user(self, user_id):
# ์ค์ ๋ก๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ์ฌ์ฉ์ ์กฐํ
pass
# ์ ํ๋ฆฌ์ผ์ด์
ํธ๋ค๋ฌ
async def get_user_handler(request):
user_id = request.match_info['id']
db = request.app['db']
user = await db.get_user(user_id)
if user:
return web.json_response(user)
return web.Response(status=404)
# ํ
์คํธ ํจ์
async def test_get_user(aiohttp_client):
# ๋ฐ์ดํฐ๋ฒ ์ด์ค ๊ฐ์ฒด ๋ชจํน
mock_db = MagicMock()
mock_db.get_user = MagicMock(return_value={"id": "1", "name": "ํ
์คํธ ์ฌ์ฉ์"})
# ์ ํ๋ฆฌ์ผ์ด์
์์ฑ
app = web.Application()
app['db'] = mock_db
app.router.add_get('/users/{id}', get_user_handler)
# ํ
์คํธ ํด๋ผ์ด์ธํธ ์์ฑ
client = await aiohttp_client(app)
# ์์ฒญ ์ ์ก ๋ฐ ์๋ต ๊ฒ์ฆ
resp = await client.get('/users/1')
assert resp.status == 200
data = await resp.json()
assert data['id'] == "1"
assert data['name'] == "ํ
์คํธ ์ฌ์ฉ์"
# ๋ชจํน๋ ๋ฉ์๋๊ฐ ํธ์ถ๋์๋์ง ํ์ธ
mock_db.get_user.assert_called_once_with("1")
WebSocket ๊ธฐ๋ฅ์ ํ
์คํธํ๊ธฐ ์ํ ๋ฐฉ๋ฒ๋ ์ ๊ณต๋๋ค.
async def test_websocket(aiohttp_client):
# WebSocket ํธ๋ค๋ฌ
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
await ws.send_str(f"echo: {msg.data}")
return ws
# ์ ํ๋ฆฌ์ผ์ด์
์ค์
app = web.Application()
app.router.add_get('/ws', websocket_handler)
# ํ
์คํธ ํด๋ผ์ด์ธํธ ์์ฑ
client = await aiohttp_client(app)
# WebSocket ์ฐ๊ฒฐ
ws = await client.ws_connect('/ws')
# ๋ฉ์์ง ์ ์ก ๋ฐ ์๋ต ํ์ธ
await ws.send_str("hello")
msg = await ws.receive()
assert msg.data == "echo: hello"
# ์ฐ๊ฒฐ ์ข
๋ฃ
await ws.close()
์ ์ฒด ์ ํ๋ฆฌ์ผ์ด์
ํ๋ฆ์ ํ
์คํธํ๊ธฐ ์ํ ํตํฉ ํ
์คํธ๋ฅผ ์์ฑํ ์ ์๋ค.
import jwt
import pytest
from aiohttp import web
# ๋ณด์ ํค
SECRET_KEY = "test_secret_key"
# ์ฌ์ฉ์ ์ธ์ฆ ๋ฏธ๋ค์จ์ด
@web.middleware
async def auth_middleware(request, handler):
if request.path == '/login':
return await handler(request)
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token:
return web.json_response({"error": "์ธ์ฆ ํ์"}, status=401)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
request['user'] = payload
except Exception:
return web.json_response({"error": "์ ํจํ์ง ์์ ํ ํฐ"}, status=401)
return await handler(request)
# ๋ก๊ทธ์ธ ํธ๋ค๋ฌ
async def login_handler(request):
data = await request.json()
if data.get('username') == 'admin' and data.get('password') == 'password':
token = jwt.encode({"user_id": 1, "username": "admin"}, SECRET_KEY, algorithm='HS256')
return web.json_response({"token": token})
return web.json_response({"error": "์๋ชป๋ ์๊ฒฉ ์ฆ๋ช
"}, status=401)
# ๋ณดํธ๋ ๋ฆฌ์์ค ํธ๋ค๋ฌ
async def protected_handler(request):
user = request.get('user')
return web.json_response({"message": f"์๋
ํ์ธ์, {user['username']}๋!"})
# ํ
์คํธ ํจ์
async def test_auth_flow(aiohttp_client):
# ์ ํ๋ฆฌ์ผ์ด์
์ค์
app = web.Application(middlewares=[auth_middleware])
app.router.add_post('/login', login_handler)
app.router.add_get('/protected', protected_handler)
# ํ
์คํธ ํด๋ผ์ด์ธํธ ์์ฑ
client = await aiohttp_client(app)
# ๋ก๊ทธ์ธ ์์ด ๋ณดํธ๋ ๋ฆฌ์์ค ์ ๊ทผ ์๋
resp = await client.get('/protected')
assert resp.status == 401
# ์๋ชป๋ ์๊ฒฉ ์ฆ๋ช
์ผ๋ก ๋ก๊ทธ์ธ
resp = await client.post('/login', json={"username": "admin", "password": "wrong"})
assert resp.status == 401
# ์ฌ๋ฐ๋ฅธ ์๊ฒฉ ์ฆ๋ช
์ผ๋ก ๋ก๊ทธ์ธ
resp = await client.post('/login', json={"username": "admin", "password": "password"})
assert resp.status == 200
data = await resp.json()
assert "token" in data
# ํ ํฐ์ผ๋ก ๋ณดํธ๋ ๋ฆฌ์์ค ์ ๊ทผ
token = data['token']
headers = {"Authorization": f"Bearer {token}"}
resp = await client.get('/protected', headers=headers)
assert resp.status == 200
data = await resp.json()
assert data["message"] == "์๋
ํ์ธ์, admin๋!"
aiohttp ํ
์คํธ ์ ๋น๋๊ธฐ ์ฝ๋๋ฅผ ํจ๊ณผ์ ์ผ๋ก ์ฒ๋ฆฌํ๊ธฐ ์ํ ๋ช ๊ฐ์ง ํ์ด๋ค.
# ์ฌ๋ฌ ๋น๋๊ธฐ ์์
์ ๋์์ ์คํ
async def test_concurrent_requests(aiohttp_client, app):
client = await aiohttp_client(app)
# ๋์์ ์ฌ๋ฌ ์์ฒญ ๋ณด๋ด๊ธฐ
import asyncio
responses = await asyncio.gather(
client.get('/api/1'),
client.get('/api/2'),
client.get('/api/3')
)
# ๊ฐ ์๋ต ๊ฒ์ฆ
for resp in responses:
assert resp.status == 200
# ์๊ฐ ์ ์ฝ ์๋ ํ
์คํธ
async def test_with_timeout(aiohttp_client, app):
client = await aiohttp_client(app)
# 5์ด ํ์์์์ผ๋ก ์์ฒญ ์คํ
import asyncio
try:
async with asyncio.timeout(5):
resp = await client.get('/slow-api')
assert resp.status == 200
except asyncio.TimeoutError:
pytest.fail("API ์๋ต์ด ๋๋ฌด ๋๋ฆฝ๋๋ค")
โ
ํน์ง:
- ์ ํ๋ฆฌ์ผ์ด์ ๋จ์ ํ ์คํธ
- ์๋ฒ ์๋ํฌ์ธํธ ๊ฒ์ฆ
- ๋น๋๊ธฐ ์์ ๋ชจํน
- ํ์์์ ์ฒ๋ฆฌ
- ๋ณ๋ ฌ ์์ฒญ ํ ์คํธ
- ์ธ์ฆ ์ํฌํ๋ก์ฐ ํ ์คํธ
aiohttp ์ ํ๋ฆฌ์ผ์ด์
์ ํ๋ก๋์
ํ๊ฒฝ์ ๋ฐฐํฌํ๊ธฐ ์ํ ๋ค์ํ ๋ฐฉ๋ฒ๊ณผ ๋ชจ๋ฒ ์ฌ๋ก๋ฅผ ์์๋ณด์.
aiohttp ์ ํ๋ฆฌ์ผ์ด์
์ gunicorn๊ณผ ํจ๊ป ๋ฐฐํฌํ๋ ๊ฒ์ด ์ผ๋ฐ์ ์ด๋ค.
# app.py - ์ ํ๋ฆฌ์ผ์ด์
์ง์
์
from aiohttp import web
async def hello(request):
return web.Response(text="Hello, World!")
async def init_app():
app = web.Application()
app.router.add_get('/', hello)
return app
# gunicorn ์์ปค๋ฅผ ์ํ ์ ํ๋ฆฌ์ผ์ด์
ํฉํ ๋ฆฌ ํจ์
def create_app():
return init_app()
gunicorn ์ค์น ๋ฐ ์คํ:
pip install gunicorn
pip install uvloop # ์ ํ์ ์ผ๋ก ์ฑ๋ฅ ํฅ์์ ์ํด
# aiohttp ์์ปค ํด๋์ค๋ฅผ ์ฌ์ฉํ์ฌ ์คํ
gunicorn app:create_app --bind 0.0.0.0:8080 --worker-class aiohttp.GunicornWebWorker
Docker๋ฅผ ์ฌ์ฉํ์ฌ aiohttp ์ ํ๋ฆฌ์ผ์ด์
์ ์ปจํ
์ด๋ํํ๋ ๋ฐฉ๋ฒ์ด๋ค.
# Dockerfile
FROM python:3.10-slim
WORKDIR /app
# ์์กด์ฑ ์ค์น
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# ์ ํ๋ฆฌ์ผ์ด์
์ฝ๋ ๋ณต์ฌ
COPY . .
# ํฌํธ ๋
ธ์ถ
EXPOSE 8080
# ์ ํ๋ฆฌ์ผ์ด์
์คํ
CMD ["gunicorn", "app:create_app", "--bind", "0.0.0.0:8080", "--worker-class", "aiohttp.GunicornWebWorker", "--workers", "4"]
requirements.txt:
aiohttp==3.8.5
gunicorn==21.2.0
uvloop==0.17.0
Docker ์ด๋ฏธ์ง ๋น๋ ๋ฐ ์คํ:
docker build -t aiohttp-app .
docker run -p 8080:8080 aiohttp-app
ํ๋ก๋์
ํ๊ฒฝ์์ aiohttp ์ ํ๋ฆฌ์ผ์ด์
์ ์คํํ ๋ ๊ณ ๋ คํด์ผ ํ ๊ตฌ์ฑ์ด๋ค.
import os
import logging
from aiohttp import web
# ํ๊ฒฝ ๋ณ์์์ ๊ตฌ์ฑ ๊ฐ์ ธ์ค๊ธฐ
HOST = os.getenv('HOST', '0.0.0.0')
PORT = int(os.getenv('PORT', 8080))
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
DEBUG = os.getenv('DEBUG', 'false').lower() == 'true'
# ๋ก๊น
์ค์
logging.basicConfig(
level=getattr(logging, LOG_LEVEL),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('aiohttp_app')
# ์ ํ๋ฆฌ์ผ์ด์
์ด๊ธฐํ
async def init_app():
app = web.Application(debug=DEBUG)
# ๊ฐ ํ๊ฒฝ(๊ฐ๋ฐ, ํ
์คํธ, ํ๋ก๋์
)์ ๋ง๋ ๊ตฌ์ฑ ๋ก๋
app['config'] = {
'database_url': os.getenv('DATABASE_URL'),
'redis_url': os.getenv('REDIS_URL'),
'secret_key': os.getenv('SECRET_KEY', 'insecure-secret'),
}
# ์ข
์์ฑ ์ค์ , ๋ผ์ฐํธ ๋ฑ๋ก
# [์๋ต]
return app
# ๊ฐ๋ฐ ํ๊ฒฝ์์ ์ง์ ์คํ
if __name__ == '__main__':
web.run_app(init_app(), host=HOST, port=PORT)
์ ํ๋ฆฌ์ผ์ด์
์ํ ๋ชจ๋ํฐ๋ง์ ์ํ ์๋ํฌ์ธํธ๋ฅผ ์ถ๊ฐํ๋ค.
import time
import psutil
import socket
from aiohttp import web
# ์์ ์๊ฐ ๊ธฐ๋ก
start_time = time.time()
async def health_check(request):
"""๊ธฐ๋ณธ ์ํ ํ์ธ ์๋ํฌ์ธํธ"""
return web.json_response({"status": "ok"})
async def readiness_check(request):
"""์๋น์ค๊ฐ ์ค๋น๋์๋์ง ํ์ธํ๋ ์๋ํฌ์ธํธ"""
# ๋ฐ์ดํฐ๋ฒ ์ด์ค, ์บ์ ๋ฑ ์ธ๋ถ ์ข
์์ฑ ์ฐ๊ฒฐ ํ์ธ
database_ok = await check_database(request.app)
cache_ok = await check_cache(request.app)
if database_ok and cache_ok:
return web.json_response({"status": "ready"})
# ์ค๋น๋์ง ์์ ์๋ต (Kubernetes ๋ฑ์ด ์ธ์ํ ์ ์์)
return web.json_response(
{"status": "not ready", "details": {"database": database_ok, "cache": cache_ok}},
status=503
)
async def metrics(request):
"""์ ํ๋ฆฌ์ผ์ด์
๋ฉํธ๋ฆญ ์๋ํฌ์ธํธ"""
process = psutil.Process()
# ๋ค์ํ ๋ฉํธ๋ฆญ ์์ง
uptime = time.time() - start_time
cpu_usage = process.cpu_percent()
memory_info = process.memory_info()
memory_usage = memory_info.rss / (1024 * 1024) # MB ๋จ์
open_connections = len(process.connections())
# ์์คํ
์ ๋ณด
hostname = socket.gethostname()
return web.json_response({
"uptime_seconds": uptime,
"cpu_usage_percent": cpu_usage,
"memory_usage_mb": memory_usage,
"open_connections": open_connections,
"hostname": hostname
})
# ์ ํ๋ฆฌ์ผ์ด์
์ ์ํ ์๋ํฌ์ธํธ ๋ฑ๋ก
app = web.Application()
app.router.add_get('/health', health_check)
app.router.add_get('/ready', readiness_check)
app.router.add_get('/metrics', metrics)
ํ๋ก๋์
ํ๊ฒฝ์์์ ๋ณด์ ๊ฐํ ๋ฐฉ๋ฒ์ด๋ค.
from aiohttp import web
from cryptography.fernet import Fernet
import secrets
import aiohttp_session
from aiohttp_session.cookie_storage import EncryptedCookieStorage
# ๋ณด์ ํค๋ ๋ฏธ๋ค์จ์ด
@web.middleware
async def security_headers_middleware(request, handler):
response = await handler(request)
# ๋ณด์ ๊ด๋ จ ํค๋ ์ถ๊ฐ
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['Content-Security-Policy'] = "default-src 'self'"
return response
# ์ ํ๋ฆฌ์ผ์ด์
์ค์
async def init_app():
# ๋ณด์ ๋ฏธ๋ค์จ์ด ์ ์ฉ
app = web.Application(middlewares=[security_headers_middleware])
# ์์ ํ ์ธ์
์ค์
fernet_key = Fernet.generate_key()
secret_key = base64.urlsafe_b64decode(fernet_key)
aiohttp_session.setup(app, EncryptedCookieStorage(secret_key))
# ์์ ํ ๋น๋ฐ๋ฒํธ ํด์ฑ์ ์ํ ์ค์
app['pwd_context'] = CryptContext(
schemes=["argon2", "bcrypt"],
deprecated="auto"
)
# CSRF ํ ํฐ ์์ฑ ๋์ฐ๋ฏธ
app['create_csrf_token'] = lambda: secrets.token_hex(32)
return app
# CSRF ๋ณดํธ ๋ฏธ๋ค์จ์ด
@web.middleware
async def csrf_middleware(request, handler):
if request.method not in ('GET', 'HEAD', 'OPTIONS', 'TRACE'):
token = request.headers.get('X-CSRF-Token')
session = await aiohttp_session.get_session(request)
if token is None or token != session.get('csrf_token'):
return web.json_response(
{"error": "CSRF ํ ํฐ์ด ์ ํจํ์ง ์์ต๋๋ค."},
status=403
)
return await handler(request)
์ ํ๋ฆฌ์ผ์ด์
์ ์ํ์ ์ผ๋ก ํ์ฅํ๊ธฐ ์ํ ์ ๋ต์ด๋ค.
# ๋ก๋ ๋ฐธ๋ฐ์ ๋ค์์ ์ฌ๋ฌ ์ธ์คํด์ค๋ฅผ ์คํํ๊ธฐ ์ํ ์ค์
# nginx.conf
server {
listen 80;
server_name example.com;
location / {
proxy_pass http://aiohttp_servers;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
upstream aiohttp_servers {
server 127.0.0.1:8001;
server 127.0.0.1:8002;
server 127.0.0.1:8003;
server 127.0.0.1:8004;
}
์ ํ๋ฆฌ์ผ์ด์
์ ์ค๋จ ์์ด ์
๋ฐ์ดํธํ๋ ๋ฐฉ๋ฒ์ด๋ค.
# systemd ์๋น์ค ํ์ผ ์์
# /etc/systemd/system/aiohttp-app.service
[Unit]
Description=aiohttp application
After=network.target
[Service]
User=appuser
Group=appuser
WorkingDirectory=/path/to/app
ExecStart=/path/to/venv/bin/gunicorn app:create_app --bind 0.0.0.0:8080 --worker-class aiohttp.GunicornWebWorker
Restart=on-failure
RestartSec=5s
# ๊ทธ๋ ์ด์คํ ์ข
๋ฃ ๊ด๋ จ ์ค์
ExecReload=/bin/kill -s HUP $MAINPID
KillMode=mixed
KillSignal=SIGTERM
TimeoutStopSec=30s
[Install]
WantedBy=multi-user.target
ํด๋ผ์ฐ๋ ํ๊ฒฝ์์ aiohttp ์ ํ๋ฆฌ์ผ์ด์
์ ๋ฐฐํฌํ๊ธฐ ์ํ ์ค์ ์ด๋ค.
# kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: aiohttp-app
spec:
replicas: 3
selector:
matchLabels:
app: aiohttp-app
template:
metadata:
labels:
app: aiohttp-app
spec:
containers:
- name: aiohttp-app
image: your-registry/aiohttp-app:latest
ports:
- containerPort: 8080
env:
- name: PORT
value: "8080"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: app-secrets
key: database-url
# ์ํ ํ์ธ ํ๋ก๋ธ
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "500m"
โ
ํน์ง:
- ๊ทธ๋ ์ด์คํ ์ข ๋ฃ ์ฒ๋ฆฌ
- ์ํ ๋ชจ๋ํฐ๋ง ์๋ํฌ์ธํธ
- ์ปจํ ์ด๋ํ ์ง์
- ์ํ ํ์ฅ ์ ๋ต
- ํ๊ฒฝ ๊ธฐ๋ฐ ๊ตฌ์ฑ
- ๋ณด์ ๋ชจ๋ฒ ์ฌ๋ก
- ํด๋ผ์ฐ๋ ๋ค์ดํฐ๋ธ ๋ฐฐํฌ
โ
๋ชจ๋ฒ ์ฌ๋ก:
-
์ธ์
์ฌ์ฌ์ฉ: ์ฌ๋ฌ ์์ฒญ์ ๋์ผํ
ClientSession
์ธ์คํด์ค๋ฅผ ์ฌ์ฌ์ฉํ์ฌ ์ฑ๋ฅ ์ต์ ํ -
๋น๋๊ธฐ ์ปจํ
์คํธ ๊ด๋ฆฌ์: ๋ฆฌ์์ค ๊ด๋ฆฌ๋ฅผ ์ํด
async with
๊ตฌ๋ฌธ ํ์ฉ - ํ์์์ ์ค์ : ๋ชจ๋ ์ธ๋ถ ์๋น์ค ํธ์ถ์ ์ ์ ํ ํ์์์ ์ค์
- ์ ์ ํ ์ํ ์ฝ๋: ์๋ฏธ ์๋ HTTP ์ํ ์ฝ๋์ ์๋ต ๋ณธ๋ฌธ ๋ฐํ
- ์๋ฌ ์ฒ๋ฆฌ: ๋ชจ๋ ์์ธ๋ฅผ ์ก์ ์ ์ ํ ์ฒ๋ฆฌํ๋ ๋ฏธ๋ค์จ์ด ๊ตฌํ
- ์ฐ๊ฒฐ ํ๋ง: ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ฐ ์ธ๋ถ ์๋น์ค ์ฐ๊ฒฐ ํ๋ง
- ์ฒญํฌ ์ฒ๋ฆฌ: ๋์ฉ๋ ๋ฐ์ดํฐ๋ ์ฒญํฌ ๋จ์๋ก ์คํธ๋ฆฌ๋ฐ
- ๊ฒฝ๋ก ์ ๊ทํ: ๋ช ํํ ๊ฒฝ๋ก ๊ตฌ์กฐ์ ๋ผ์ฐํ ํจํด ์ค๊ณ
-
์ปจํ
์คํธ ํ์ฉ:
request['app']
์ ํตํ ์ ํ๋ฆฌ์ผ์ด์ ์ปจํ ์คํธ ํ์ฉ - ๋ก๊น ์ ๋ต: ๊ตฌ์กฐํ๋ ๋ก๊น ์ผ๋ก ๋ฌธ์ ํด๊ฒฐ ์ฉ์ด์ฑ ํ๋ณด
- graceful ์ข ๋ฃ: ๋ชจ๋ ์ฐ๊ฒฐ๊ณผ ์์ ์ด ์์ ํ๊ฒ ์ข ๋ฃ๋๋๋ก ์ฒ๋ฆฌ
- ์ํธํ ์ ์ฉ: ๋ฏผ๊ฐํ ๋ฐ์ดํฐ์ ์ ์ ํ ์ํธํ ์ ์ฉ
- CSRF ๋ณดํธ: ์ํ ๋ณ๊ฒฝ ์์ฒญ์ CSRF ํ ํฐ ๊ฒ์ฆ
- CORS ์ค์ : ์ ์ ํ CORS ์ ์ฑ ์ผ๋ก ๋ธ๋ผ์ฐ์ ๋ณด์ ๊ฐํ
- API ๋ฌธ์ํ: OpenAPI ๋๋ ์ ์ฌํ ๋๊ตฌ๋ก API ๋ฌธ์ํ
-
๋น๋๊ธฐ ํจํด ์ดํด: ๋ธ๋กํน ์ฝ๋๋ฅผ ์คํํ ๋
asyncio.to_thread()
ํ์ฉ