KR_Coroutine - somaz94/python-study GitHub Wiki
์ฝ๋ฃจํด์ ์ง์
์ ์ด ์ฌ๋ฌ ๊ฐ์ธ ์๋ธ๋ฃจํด์ผ๋ก, ์คํ์ ์ผ์ ์ค๋จํ๊ณ ์ฌ๊ฐํ ์ ์๋ค.
async def simple_coroutine():
print('์ฝ๋ฃจํด ์์')
await asyncio.sleep(1)
print('์ฝ๋ฃจํด ์ข
๋ฃ')
# ์คํ
import asyncio
asyncio.run(simple_coroutine())
โ
ํน์ง:
- ๋น๋๊ธฐ ์คํ
- ์คํ ์ค๋จ/์ฌ๊ฐ
- ์ด๋ฒคํธ ๋ฃจํ ๊ธฐ๋ฐ
- ๋จ์ผ ์ค๋ ๋์์ ๋์์ฑ ๊ตฌํ
- ํจ์จ์ ์ธ I/O ์ฒ๋ฆฌ
# ๋ฉ์ธ ํจ์์์ ์คํ
async def main():
await simple_coroutine()
asyncio.run(main())
# ์ด๋ฒคํธ ๋ฃจํ ์ง์ ์ ์ด
loop = asyncio.get_event_loop()
loop.run_until_complete(simple_coroutine())
loop.close()
# ํ์คํฌ๋ก ์์ฝ
async def run_tasks():
task = asyncio.create_task(simple_coroutine())
await task
asyncio.run(run_tasks())
Python์ ์ฝ๋ฃจํด ๋ฌธ๋ฒ ์งํ์ ํ๋์ ์ธ ๋น๋๊ธฐ ์ฒ๋ฆฌ ๋ฐฉ์์ด๋ค.
# ์ ๋๋ ์ดํฐ ๊ธฐ๋ฐ ์ฝ๋ฃจํด (์ด์ ๋ฐฉ์)
def old_style_coroutine():
yield 'First'
yield 'Second'
yield 'Third'
# ์ ๋๋ ์ดํฐ ๊ธฐ๋ฐ ์ฝ๋ฃจํด ์ฒด์ด๋ (Python 3.3+)
def delegator():
yield from old_style_coroutine()
# ํ๋์ ์ธ ์ฝ๋ฃจํด (Python 3.5+)
async def modern_coroutine():
await asyncio.sleep(1)
return 'Result'
# ์ฝ๋ฃจํด ์ฒด์ด๋
async def chain_coroutines():
first = await modern_coroutine()
second = await modern_coroutine()
return [first, second]
โ
ํน์ง:
- ํ๋์ ๋ฌธ๋ฒ ์ง์
- ์ฝ๋ฃจํด ์ฒด์ด๋
- ๊ฐ๋ ์ฑ ํฅ์
- ํ์ ํํ ์ง์
- ๋น๋๊ธฐ ์ปจํ ์คํธ ๊ด๋ฆฌ์ ํธํ
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def concurrent_example():
start = time.time()
# ๋ณ๋ ฌ ์คํ
task1 = asyncio.create_task(say_after(1, '์์
1 ์๋ฃ'))
task2 = asyncio.create_task(say_after(2, '์์
2 ์๋ฃ'))
print(f"์์ ์๊ฐ: {time.strftime('%X')}")
# ๋ ํ์คํฌ ์๋ฃ ๋๊ธฐ
await task1
await task2
end = time.time()
print(f"๋๋ ์๊ฐ: {time.strftime('%X')}")
print(f"์ด ์์ ์๊ฐ: {end - start:.2f} ์ด") # ์ฝ 2์ด ์์ (๋ณ๋ ฌ ์คํ)
asyncio.run(concurrent_example())
์ฝ๋ฃจํด์ ๋ค์ํ ์ํ ๊ด๋ฆฌ์ ์์ธ ์ฒ๋ฆฌ ๋ฐฉ๋ฒ์ด๋ค.
async def error_handling():
try:
await risky_operation()
except Exception as e:
print(f'์๋ฌ ๋ฐ์: {e}')
else:
print('์ฑ๊ณต์ ์ผ๋ก ์๋ฃ')
finally:
print('์ ๋ฆฌ ์์
์ํ')
async def risky_operation():
await asyncio.sleep(1)
raise ValueError('์๋์ ์ธ ์๋ฌ')
# ์ฌ๋ฌ ์์ธ ์ ํ ์ฒ๋ฆฌ
async def comprehensive_error_handling():
try:
await risky_operation()
except ValueError as e:
print(f'๊ฐ ์ค๋ฅ: {e}')
except asyncio.TimeoutError:
print('์๊ฐ ์ด๊ณผ')
except asyncio.CancelledError:
print('์์
์ทจ์๋จ')
raise # ์ทจ์ ์์ธ๋ ๋ค์ ๋ฐ์์ํค๋ ๊ฒ์ด ์ข๋ค
except Exception as e:
print(f'๊ธฐํ ์์ธ: {e}')
โ
ํน์ง:
- ์์ธ ์ฒ๋ฆฌ ํจํด
- ์ ๋ฆฌ ์์ ๋ณด์ฅ
- ์ํ ๊ด๋ฆฌ
- ๋ค์ํ ์์ธ ์ ํ ๋์
- ๋น๋๊ธฐ ์ปจํ ์คํธ ๊ด๋ฆฌ์ ํ์ฉ
import logging
import contextlib
import time
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@contextlib.asynccontextmanager
async def track_coroutine_status(name):
"""์ฝ๋ฃจํด ์คํ ์ํ๋ฅผ ์ถ์ ํ๋ ๋น๋๊ธฐ ์ปจํ
์คํธ ๊ด๋ฆฌ์"""
start_time = time.time()
logging.info(f"์ฝ๋ฃจํด '{name}' ์์")
try:
yield
except Exception as e:
logging.error(f"์ฝ๋ฃจํด '{name}' ์คํ ์ค ์ค๋ฅ ๋ฐ์: {e}")
raise
finally:
elapsed = time.time() - start_time
logging.info(f"์ฝ๋ฃจํด '{name}' ์๋ฃ (์์ ์๊ฐ: {elapsed:.2f}์ด)")
async def monitored_task():
async with track_coroutine_status("๋ฐ์ดํฐ ์ฒ๋ฆฌ"):
await asyncio.sleep(1) # ์์
์๋ฎฌ๋ ์ด์
# raise ValueError("์์ ์ค๋ฅ") # ์ฃผ์ ํด์ ํ์ฌ ์ค๋ฅ ํ
์คํธ
await asyncio.sleep(0.5) # ์ถ๊ฐ ์์
return "์ฒ๋ฆฌ ์๋ฃ"
๋ค์ํ ๋ฐฉ์์ผ๋ก ์ฌ๋ฌ ์ฝ๋ฃจํด์ ์คํํ๊ณ ์กฐํฉํ๋ ๋ฐฉ๋ฒ์ด๋ค.
async def operation1():
await asyncio.sleep(1)
return "๊ฒฐ๊ณผ 1"
async def operation2():
await asyncio.sleep(2)
return "๊ฒฐ๊ณผ 2"
async def operation3():
await asyncio.sleep(1.5)
return "๊ฒฐ๊ณผ 3"
async def parallel_operations():
# ์ฌ๋ฌ ์ฝ๋ฃจํด ๋์ ์คํ
results = await asyncio.gather(
operation1(),
operation2(),
operation3()
)
return results # ['๊ฒฐ๊ณผ 1', '๊ฒฐ๊ณผ 2', '๊ฒฐ๊ณผ 3']
async def sequential_operations():
# ์์ฐจ์ ์คํ
result1 = await operation1()
result2 = await operation2()
result3 = await operation3()
return [result1, result2, result3]
# ํน์ ์์๋ก ๊ฒฐ๊ณผ ์ฒ๋ฆฌ
async def process_as_completed():
tasks = [
operation1(),
operation2(),
operation3()
]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"์๋ฃ๋ ํ์คํฌ ๊ฒฐ๊ณผ: {result}")
โ
ํน์ง:
- ๋ณ๋ ฌ ์คํ
- ์์ฐจ ์คํ
- ๊ฒฐ๊ณผ ์์ง
- ์๋ฃ ์์ ๊ธฐ๋ฐ ์ฒ๋ฆฌ
- ํ์คํฌ ๊ทธ๋ฃนํ
import aiohttp
import asyncio
from typing import List, Dict, Any
async def fetch_data(url: str) -> Dict[str, Any]:
"""URL์์ ๋น๋๊ธฐ์ ์ผ๋ก ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
async def fetch_all_data(urls: List[str]) -> List[Dict[str, Any]]:
"""์ฌ๋ฌ URL์์ ๋ณ๋ ฌ๋ก ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ"""
tasks = [fetch_data(url) for url in urls]
return await asyncio.gather(*tasks)
async def process_user_data():
"""์ฌ์ฉ์ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์์ """
# ์ฌ๋ฌ API ์๋ํฌ์ธํธ์์ ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ
user_data = await fetch_data("https://api.example.com/users/1")
# ์ฌ์ฉ์์ ๊ฒ์๋ฌผ๊ณผ ๋๊ธ ๋ณ๋ ฌ๋ก ๊ฐ์ ธ์ค๊ธฐ
posts_url = f"https://api.example.com/users/{user_data['id']}/posts"
comments_url = f"https://api.example.com/users/{user_data['id']}/comments"
posts, comments = await asyncio.gather(
fetch_data(posts_url),
fetch_data(comments_url)
)
# ๋ฐ์ดํฐ ์ฒ๋ฆฌ ๋ฐ ๊ฒฐํฉ
result = {
"user": user_data,
"posts": posts,
"comments": comments,
"activity_score": len(posts) + len(comments)
}
return result
์ฝ๋ฃจํด ์คํ ์ ์ด๋ฅผ ์ํ ํ์์์ ์ค์ ๊ณผ ์ทจ์ ๋ฉ์ปค๋์ฆ์ด๋ค.
async def long_operation():
print("๊ธด ์์
์์")
await asyncio.sleep(10) # ๊ธด ์์
์๋ฎฌ๋ ์ด์
print("๊ธด ์์
์๋ฃ")
return "์๋ฃ"
async def with_timeout():
try:
# Python 3.11+
async with asyncio.timeout(2.0):
await long_operation()
# Python 3.10 ์ดํ
# async with asyncio.timeout_at(asyncio.get_event_loop().time() + 2.0):
# await long_operation()
except asyncio.TimeoutError:
print('์์
์๊ฐ ์ด๊ณผ')
async def cancellation_demo():
try:
task = asyncio.create_task(long_operation())
await asyncio.sleep(1)
task.cancel()
await task
except asyncio.CancelledError:
print('์์
์ด ์ทจ์๋จ')
# ์ทจ์ ์ฒ๋ฆฌ๊ฐ ๊ตฌํ๋ ์์
async def cancellable_operation():
try:
while True:
print("์์
์ํ ์ค...")
await asyncio.sleep(0.5)
except asyncio.CancelledError:
print("์ทจ์ ์ ํธ ๊ฐ์ง")
# ์ ๋ฆฌ ์์
์ํ
print("๋ฆฌ์์ค ์ ๋ฆฌ ์ค...")
await asyncio.sleep(0.5)
print("์ ๋ฆฌ ์๋ฃ")
raise # ์ทจ์ ์ ํธ ์ ํ
โ
ํน์ง:
- ํ์์์ ์ค์
- ์์ ์ทจ์ ๋ฉ์ปค๋์ฆ
- ๋ฆฌ์์ค ๊ด๋ฆฌ
- ์ทจ์ ์ ํธ ์ฒ๋ฆฌ
- ์ฐ์ํ ์ข ๋ฃ ๊ตฌํ
import functools
import asyncio
from typing import TypeVar, Callable, Awaitable, Optional, Any, cast
T = TypeVar('T')
def with_timeout(timeout: float):
"""
์ง์ ๋ ํ์์์์ผ๋ก ์ฝ๋ฃจํด ํจ์๋ฅผ ๊ฐ์ธ๋ ๋ฐ์ฝ๋ ์ดํฐ
Args:
timeout: ํ์์์(์ด)
์์ธ:
asyncio.TimeoutError: ์์
์ด ์ง์ ๋ ์๊ฐ ๋ด์ ์๋ฃ๋์ง ์์ผ๋ฉด ๋ฐ์
"""
def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> T:
return await asyncio.wait_for(func(*args, **kwargs), timeout=timeout)
return wrapper
return decorator
@with_timeout(2.0)
async def fetch_with_timeout(url: str) -> str:
"""2์ด ํ์์์์ด ์๋ URL ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
์ฝ๋ฃจํด๊ณผ ๋ค๋ฅธ ๋์์ฑ ๋ฉ์ปค๋์ฆ์ ํจ๊ป ์ฌ์ฉํ๋ ๋ฐฉ๋ฒ์ด๋ค.
import asyncio
import concurrent.futures
import time
import functools
def cpu_bound_task(x):
"""CPU ์ง์ฝ์ ์์
(์ค๋ ๋/ํ๋ก์ธ์ค ํ์์ ์คํ)"""
result = 0
for i in range(10**7):
result += i + x
return result
async def run_in_threadpool(func, *args):
"""ํจ์๋ฅผ ์ค๋ ๋ ํ์์ ์คํ"""
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
return await loop.run_in_executor(
pool, functools.partial(func, *args)
)
async def run_in_processpool(func, *args):
"""ํจ์๋ฅผ ํ๋ก์ธ์ค ํ์์ ์คํ"""
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
return await loop.run_in_executor(
pool, functools.partial(func, *args)
)
async def mixed_concurrency():
"""์ฝ๋ฃจํด, ์ค๋ ๋, ํ๋ก์ธ์ค ํผํฉ ์ฌ์ฉ"""
# I/O ๋ฐ์ด๋ ์์
์ ์ฝ๋ฃจํด์ผ๋ก
io_task = asyncio.create_task(asyncio.sleep(1))
# CPU ๋ฐ์ด๋ ์์
์ ํ๋ก์ธ์ค ํ๋ก
cpu_task1 = run_in_processpool(cpu_bound_task, 1)
cpu_task2 = run_in_processpool(cpu_bound_task, 2)
# ํ์ผ I/O๋ ์ค๋ ๋ ํ๋ก (๋ธ๋กํน I/O)
file_task = run_in_threadpool(lambda: open('large_file.txt', 'r').read())
# ๋ชจ๋ ์์
์๋ฃ ๋๊ธฐ
results = await asyncio.gather(
io_task,
cpu_task1,
cpu_task2,
file_task
)
return results
โ
ํน์ง:
- ์ฝ๋ฃจํด๊ณผ ์ค๋ ๋ ํ ํตํฉ
- ํ๋ก์ธ์ค ํ ํ์ฉ
- ๋ธ๋กํน ์ฝ๋ ์ฒ๋ฆฌ
- ์์ ์ ํ๋ณ ์ต์ ์คํ ๋ฐฉ์
- ๋ฆฌ์์ค ํจ์จ์ฑ ๊ทน๋ํ
์ค์ ์ ํ๋ฆฌ์ผ์ด์
์์ ํ์ฉ ๊ฐ๋ฅํ ๋น๋๊ธฐ ํจํด๊ณผ ๊ณ ๊ธ ๊ธฐ๋ฒ์ด๋ค.
import asyncio
import aiohttp
from typing import List, Dict
async def fetch_with_semaphore(semaphore, url):
"""์ธ๋งํฌ์ด๋ก ๋์ ์์ฒญ ์ ์ ํ"""
async with semaphore:
print(f"{url} ์์ฒญ ์์")
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
print(f"{url} ์์ฒญ ์๋ฃ")
return data
async def rate_limited_requests(urls: List[str], limit: int = 5):
"""๋์ ์์ฒญ ์๋ฅผ ์ ํํ์ฌ API ํธ์ถ"""
semaphore = asyncio.Semaphore(limit)
tasks = [fetch_with_semaphore(semaphore, url) for url in urls]
return await asyncio.gather(*tasks)
import asyncio
from typing import AsyncIterator, List
async def async_range(start: int, stop: int, delay: float = 0.1) -> AsyncIterator[int]:
"""๋น๋๊ธฐ ๋ฒ์ ์์ฑ๊ธฐ"""
for i in range(start, stop):
await asyncio.sleep(delay)
yield i
async def process_stream(stream: AsyncIterator[int]) -> List[int]:
"""๋น๋๊ธฐ ์คํธ๋ฆผ ์ฒ๋ฆฌ"""
result = []
async for item in stream:
result.append(item * 2)
return result
async def demo_async_iterator():
"""๋น๋๊ธฐ ์ดํฐ๋ ์ดํฐ ๋ฐ๋ชจ"""
stream = async_range(0, 10)
processed = await process_stream(stream)
print(f"๊ฒฐ๊ณผ: {processed}")
import asyncio
import aiofiles
from typing import AsyncIterator
class AsyncResource:
"""๋น๋๊ธฐ ๋ฆฌ์์ค ๊ด๋ฆฌ ์์ """
def __init__(self, name: str):
self.name = name
async def __aenter__(self):
print(f"{self.name} ๋ฆฌ์์ค ์ด๊ธฐํ ์ค...")
await asyncio.sleep(0.1) # ์ด๊ธฐํ ์๋ฎฌ๋ ์ด์
print(f"{self.name} ์ค๋น ์๋ฃ")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f"{self.name} ์ ๋ฆฌ ์ค...")
await asyncio.sleep(0.1) # ์ ๋ฆฌ ์์
์๋ฎฌ๋ ์ด์
print(f"{self.name} ์ ๋ฆฌ ์๋ฃ")
async def process(self, data: str) -> str:
print(f"{self.name}์์ ์ฒ๋ฆฌ: {data}")
await asyncio.sleep(0.2) # ์ฒ๋ฆฌ ์๋ฎฌ๋ ์ด์
return f"{data} ์ฒ๋ฆฌ๋จ"
async def async_file_processing(filename: str) -> str:
"""๋น๋๊ธฐ ํ์ผ ์ฒ๋ฆฌ ์์ """
async with AsyncResource("ํ์ผ ์ฒ๋ฆฌ๊ธฐ") as processor:
async with aiofiles.open(filename, mode='r') as file:
content = await file.read()
result = await processor.process(content)
return result
โ
ํน์ง:
- ์ธ๋งํฌ์ด ๊ธฐ๋ฐ ๋์์ฑ ์ ์ด
- ๋น๋๊ธฐ ์ดํฐ๋ ์ดํฐ/์ ๋๋ ์ดํฐ
- ๋น๋๊ธฐ ์ปจํ ์คํธ ๊ด๋ฆฌ์
- ์คํธ๋ฆผ ์ฒ๋ฆฌ ํจํด
- ๋ฆฌ์์ค ํจ์จ์ ๊ด๋ฆฌ
โ
๋ชจ๋ฒ ์ฌ๋ก:
- async/await ๋ฌธ๋ฒ ์ฌ์ฉ: ์ต์ Python ๋น๋๊ธฐ ๋ฌธ๋ฒ์ ํ์ฉํ์ฌ ๊ฐ๋ ์ฑ๊ณผ ์ ์ง๋ณด์์ฑ ํฅ์
- ์ ์ ํ ์๋ฌ ์ฒ๋ฆฌ ๊ตฌํ: ๋ชจ๋ ๋น๋๊ธฐ ์์ ์ try/except ๋ธ๋ก ํ์ฉ
- ์ฝ๋ฃจํด ์ทจ์ ์ฒ๋ฆฌ ๊ตฌํ: CancelledError๋ฅผ ์ ์ ํ ์ฒ๋ฆฌํ์ฌ ๋ฆฌ์์ค ๋์ ๋ฐฉ์ง
- ๋์์ฑ ์ ์ด ๋๊ตฌ ํ์ฉ: ์ธ๋งํฌ์ด๋ ๋ฝ์ ํ์ฉํ์ฌ ๋์ ์คํ ์ ํ
- ๋๋ฒ๊น ๋ก๊ทธ ์ถ๊ฐ: ๋น๋๊ธฐ ์์ ์ถ์ ์ ์ํ ๋ก๊น ๊ตฌํ
- ์ฑ๋ฅ ๋ชจ๋ํฐ๋ง ๊ตฌํ: ์์ ์๊ฐ ์ธก์ ๋ฐ ๋ณ๋ชฉ ์ง์ ์๋ณ
- ๋ฉ๋ชจ๋ฆฌ ๋์ ์ฃผ์: ๋ฏธ์๋ฃ ํ์คํฌ์ ์ด๋ฒคํธ ๋ฃจํ ๊ด๋ฆฌ์ ์ฃผ์
- ๋ธ๋กํน ์ฝ๋ ์ฒ๋ฆฌ: CPU ์ง์ฝ์ ์์ ์ ํ๋ก์ธ์ค ํ๋ก, ๋ธ๋กํน I/O๋ ์ค๋ ๋ ํ๋ก ์ด๋
- ๋๊ธฐ ์ฝ๋์ ํตํฉ: ๊ธฐ์กด ๋๊ธฐ ์ฝ๋๋ฅผ ๋น๋๊ธฐ ํ๊ฒฝ์ ํตํฉํ๋ ๋ฐฉ๋ฒ ์์ง
- ํ ์คํธ ๋ฐฉ๋ฒ๋ก : ๋น๋๊ธฐ ์ฝ๋ ํ ์คํธ๋ฅผ ์ํ ๋ฐฉ๋ฒ๋ก ์ ์ฉ
- ํ์์์ ์ค์ : ๋ชจ๋ ์ธ๋ถ ๋ฆฌ์์ค ํธ์ถ์ ํ์์์ ์ค์
- ์ฐ์ํ ์ข ๋ฃ ์ฒ๋ฆฌ: ์ ํ๋ฆฌ์ผ์ด์ ์ข ๋ฃ ์ ์คํ ์ค์ธ ํ์คํฌ ์ ๋ฆฌ
- ๋ฒ์ ํธํ์ฑ ์ฃผ์: Python ๋ฒ์ ๋ณ asyncio API ์ฐจ์ด ์ธ์ง
- ์๋ํํฐ ๋ผ์ด๋ธ๋ฌ๋ฆฌ ํ์ฉ: aiohttp, aiofiles ๋ฑ์ ๋น๋๊ธฐ ๋ผ์ด๋ธ๋ฌ๋ฆฌ ํ์ฉ
- ํ์คํฌ ๊ทธ๋ฃนํ: ๊ด๋ จ ํ์คํฌ ๊ทธ๋ฃนํ ๋ฐ ์๋ช ์ฃผ๊ธฐ ๊ด๋ฆฌ