[Deprecated] Serial Task Queue 구현 - 100-hours-a-week/5-yeosa-wiki GitHub Wiki
도입하지 않기로 한 내용입니다
1. Queue 구현 코드
import asyncio
from typing import Callable, Awaitable, Any
from app.utils.logging_decorator import log_exception, log_flow
class SerialTaskQueue:
def __init__(self):
self._queue = asyncio.Queue()
self._is_running = False
@log_flow
def start(self):
if not self._is_running:
loop = asyncio.get_event_loop()
loop.create_task(self._worker())
self._is_running = True
@log_exception
async def _worker(self):
while True:
coro_func = await self._queue.get()
await coro_func()
self._queue.task_done()
@log_exception
async def enqueue(self, coro_func: Callable[], Awaitable[Any](/100-hours-a-week/5-yeosa-wiki/wiki/],-Awaitable[Any)) -> Any:
future = asyncio.get_event_loop().create_future()
async def wrapper():
result = await coro_func()
future.set_result(result)
await self._queue.put(wrapper)
return await future
2. 코드 설명
a. 동작 흐름
SerialTaskQueue.start()
호출 시:loop.create_task(self._worker())
로 백그라운드 워커 1개 생성
- 워커는
await self._queue.get()
으로 큐에 작업이 들어오기를 영원히 대기함 - 외부에서
enqueue(task)
가 호출되면:- 내부에서
loop.create_future()
로 비어 있는 Future 객체 생성 - 해당 task를 실행하고 결과를 future에 채워주는
wrapper()
함수를 정의 - 이 wrapper를 큐에
put()
- 내부에서
- 워커는 대기 중이었다가 wrapper를
get()
하여 실제 task 실행 - wrapper 실행 완료 → Future에
.set_result(result)
로 결과를 기록 enqueue()
를 호출했던 곳에서는await future
를 통해 결과를 반환받음return await future
는 future에 값이 채워지면 값을 반환- future에 값은 언제 채워지나? →
wrapper()
가 실행되면, future.set_result를 통해 값이 채워짐 wrapper()
는 언제 실행되나? → 워커가 queue에서 get한coro_func
가 사실은 wrapper로 감싼coro_func
→await coro_func()
가 끝날 때 값이 future에 할당
- 워커는
task_done()
을 호출하여 해당 작업의 처리 완료를 알리고, 다음 작업 대기 상태로 돌아감
b. 의문과 해답 정리
질문 | 답변 |
---|---|
asyncio.Queue() 란? |
비동기 환경에서 FIFO 방식으로 작업을 저장하고, 안전하게 꺼낼 수 있는 큐 객체 |
loop.create_task() 를 여러 번 하면? |
호출한 만큼 워커가 생김. 직렬 처리 보장 위해 한 번만 호출해야 함 |
await queue.get() 은 queue가 비어 있으면? |
❌ 에러 없음. 자동으로 대기 상태에 들어감 |
_worker() 는 polling처럼 while 도는 구조인데 성능 문제 없나? |
❌ 없음. CPU 점유 안 하고 asyncio 내부에서 효율적으로 await 상태 유지함 |
Future 는 왜 필요할까? |
enqueue() 에서 task 결과를 비동기로 돌려줄 때 사용. 실제로는 결과를 담을 약속 객체 |
task_done() 은 꼭 필요할까? |
join() 을 쓰지 않는 경우 생략해도 되지만, 큐 상태 정합성과 확장성을 위해 권장됨 |
join() 은 뭐할 때 쓰나? |
큐에 넣은 모든 작업이 끝날 때까지 블로킹 상태로 대기하는 메서드 |
여러 워커가 동시에 queue.get() 하면 충돌 나지 않나? |
✅ 내부적으로 safe함. 가장 먼저 대기한 워커가 먼저 task를 가져감 (FIFO 보장) |
3. 실제 도입 과정
app/main.py
)
a. Queue를 app.state에 반영(@asynccontextmanager
async def lifespan(app: FastAPI):
"""서버 실행 시, 모델 및 이미지 로더 초기화 로직입니다."""
...
app.state.embedding_queue = SerialTaskQueue()
app.state.postprocess_queue = SerialTaskQueue()
app.state.people_clustering_queue = SerialTaskQueue()
app.state.embedding_queue.start()
app.state.postprocess_queue.start()
app.state.people_clustering_queue.start()
...
app/api/endpoints/album_embedding_router.py
)
b. Router 수정(@router.post("", status_code=201)
@log_flow
async def embed(req: ImageRequest, request: Request):
return await request.app.state.embedding_queue.enqueue(
lambda: embed_controller(req, request)
)
코드 요소 | 설명 |
---|---|
request.app.state.embedding_queue |
embedding 요청을 직렬로 처리하는 전용 큐 |
enqueue(lambda: embed_controller(...)) |
embedding 작업 전체를 큐에 등록 |
await ...enqueue(...) |
큐에서 처리 완료 후 그 결과를 받아 응답으로 반환 |
-
왜
router
에서enqueue()
를 했는가?- 책임 분리
controller
는 비즈니스 로직만 담당 (입력 → 처리 → 출력)router
는 요청 흐름 제어와 리소스 관리 담당- 큐에 넣는 작업은 "요청을 어떻게 처리할 것인가"에 해당되므로 router의 책임으로 보는 것이 자연스러움
controller
내부에서 await가 두 번 이상 발생하는 경우 한 번에 처리
images = await image_loader.load_images(filenames) # ✅ 1차 await (I/O) await loop.run_in_executor(None, embed_images, ...) # ✅ 2차 await (CPU)
controller
내부에서run_in_executor
로 CPU-bound 작업만 직렬화해도, 그 전에 발생한image_loader
는 여전히 병렬로 작동하게 됨- 이로 인해 리소스 경합, Out of Memory, I/O 스파이크 등이 발생할 수 있음
- 만약
image_loader
가 여러 요청에서 동시에 실행된다면, 그 시점에서 이미 큐 바깥에서 병렬 처리가 진행되고 있는 것
- 구조 일관성 확보
- 모든
controller
가 순수 함수처럼 유지되면, router
에서 공통적으로enqueue(...)
로 감싸는 방식으로 처리 흐름을 통일할 수 있음