스레드 분리(run_in_executor) - 100-hours-a-week/5-yeosa-wiki GitHub Wiki
run_in_executor
) 목적
1. 스레드 분리(-
이미지 처리와 임베딩 등 CPU 사용량이 높은 연산을 FastAPI 메인 이벤트 루프에서 직접 실행할 경우,
-
전체 서버 응답이 느려지거나 차단되는 현상이 발생할 수 있음
-
이를 해결하기 위해
loop.run_in_executor()
를 사용해 요청 처리와 연산 처리를 분리 -
분리 근거
a. 요청을 받는 스레드와 실제 작업을 처리하는 스레드를 분리할 수 있기 때문
-
FastAPI는 내부적으로 **비동기 이벤트 루프(싱글 스레드)**로 동작함
-
async def
로 만든 API는 I/O 바운드 작업에는 강하지만, **CPU 바운드 작업(예: 이미지 임베딩)**을 실행하면 이벤트 루프가 멈춤 -
run_in_executor()
를 사용하면 “요청을 받는 스레드”와 “무거운 작업을 처리하는 스레드”를 완전히 분리할 수 있음→ 서버가 한 요청에 발이 묶이지 않고 다른 요청을 계속 처리 가능
b. 도입하지 않으면, 임베딩 요청이 몰릴 경우 서버가 응답을 못하게 된다
- 예: 사용자가 여러 명 동시에 사진 업로드 → 임베딩 요청 폭주
- embedding 처리 코드가
await
없이 실행되면 → 이벤트 루프가 블로킹됨 - 그 순간 다른 사용자의 요청도 받지 못하고, 전체 서버가 멈춘 것처럼 보이게 됨
- 결국:
- 응답 지연 (504 Gateway Timeout)
- 서버 과부하
- 사용자 경험 악화
c. 메시지 큐를 도입하더라도run_in_executor()
는 여전히 필요하다- 메시지 큐는 “요청을 나중에 처리하자”는 작업 스케줄링 전략
- 하지만 큐 안에서 꺼낸 작업을 처리하는 워커 코드도 FastAPI API처럼 동기 코드일 수 있다.
- 즉, 메시지 큐로 들어온 작업을 처리하는 시점에서도 CPU-heavy 코드가 이벤트 루프 안에서 동기적으로 실행되면 그 워커 프로세스도 응답성 저하 + 병렬성 상실
- 따라서 메시지 큐 구조든 API 구조든 CPU-heavy 코드는 무조건 run_in_executor()로 offload 해야 서버 자원을 제대로 나눠쓸 수 있음
- 문제 상황 예시
- 메시지 큐 (예: Celery, custom queue)에서 워커가 작업을 꺼냄
- 꺼낸 작업은 FastAPI 앱 안에서 동작하는 함수 (혹은 async 핸들러)일 수 있음
- 그런데 그 작업이 CPU 바운드고
run_in_executor()
없이 그냥 실행됨 - 결과
- 워커의 스레드 or 프로세스가 CPU-heavy 태스크를 점유한 채 작업을 오래 끌게 되고,
- 다른 큐 작업이 도착해도 실행할 워커 자원이 없음
- 큐에는 메시지가 쌓이지만, 처리는 느려지고 병목 발생
d. 핵심 요약
이유 설명 스레드 분리 요청 수신과 처리 실행을 분리해서 서버 응답성 유지 블로킹 방지 임베딩 처리 중 루프가 막히면 다른 요청도 응답 못 받게 됨 메시지 큐와 무관하게 필요 큐로 비동기 처리하더라도, 실제 실행 시 병목이 생기면 결국 서버가 느려짐 - 모놀리식 구조 안에서 “요청 처리”와 “무거운 로직 실행”을 실행 단위에서 분리해, 마치 마이크로서비스처럼 독립 실행이 가능한 구조로 만드는 것.
-
2. 개선 전 구조
@log_exception
async def embed_controller(req: ImageRequest, request: Request):
filenames = req.images
images = await request.app.state.image_loader.load_images(filenames)
# ❌ CPU-heavy 작업을 루프 안에서 직접 실행
embed_images(
request.app.state.clip_model,
request.app.state.clip_preprocess,
images,
filenames,
batch_size=16,
device="cpu",
)
return {"message": "success", "data": None}
3. 개선 후 구조
...
@asynccontextmanager
async def lifespan(app: FastAPI):
"""서버 실행 시, 모델 및 이미지 로더 초기화 로직입니다."""
...
loop = asyncio.get_running_loop()
...
app.state.loop = loop
yield
...
from functools import partial
@log_exception
async def embed_controller(req: ImageRequest, request: Request):
filenames = req.images
images = await request.app.state.image_loader.load_images(filenames)
loop = request.app.state.loop # startup에서 등록된 루프 가져오기
task = partial(
embed_images,
request.app.state.clip_model,
request.app.state.clip_preprocess,
images,
filenames,
batch_size=16,
device="cpu",
)
# ✅ run_in_executor로 CPU-heavy 작업을 스레드로 분리 실행
await loop.run_in_executor(None, task)
return {"message": "success", "data": None}
4. 개선 후 코드 분석
loop = asyncio.get_running_loop()
a. - FastAPI 서버가 사용하는 이벤트 루프 객체를 가져옴
- 보통
@app.on_event("startup")
에서 등록하여app.state.loop
에 저장
loop.run_in_executor(None, task)
b. - CPU-heavy 작업을 이벤트 루프가 아닌 별도 스레드풀에서 실행
- 이벤트 루프는 다른 요청을 계속 처리 가능
- 결과가 나올 때까지 비동기적으로 대기 (
await
)
c. 원리 요약
개념 | 설명 |
---|---|
async def |
비동기 함수 선언. 내부에서 await 사용 가능 |
run_in_executor() |
동기 함수(CPU-heavy)를 백그라운드 스레드에서 실행 |
partial() |
run_in_executor() 가 **kwargs 를 지원하지 않기 때문에 인자를 미리 묶는 데 사용 |
loop = request.app.state.loop |
FastAPI가 사용하는 이벤트 루프 객체를 startup에서 저장 후 가져옴 |
-
이벤트 루프란?
"비동기 함수들을 순차 실행하며, await가 나오면 다른 작업을 실행하도록 관리해주는 반복기계"
- FastAPI는 내부적으로 asyncio 이벤트 루프를 사용
- 모든
async def
함수는 이벤트 루프가 실행함 - 무거운 작업을 루프 안에서 직접 실행하면, 다른 요청까지 멈추게 됨
5. 개선 효과
항목 | 개선 전 | 개선 후 |
---|---|---|
이벤트 루프 점유 | O (서버 전체 응답 지연) | ❌ (스레드 분리) |
요청 응답성 | 느려짐 | 유지됨 |
확장성 | 낮음 | 향후 큐/멀티서버로 확장 용이 |
적용 난이도 | - | 매우 낮음 (기존 로직 재사용 가능) |
6. 결론
a. 컨트롤러별 적용
컨트롤러 | 주요 변경점 |
---|---|
embed_controller |
embed_images() → run_in_executor() 처리 |
categorize_controller |
get_cached_embeddings_parallel , categorize_images 모두 분리 실행 |
duplicate_controller |
get_cached_embeddings_parallel , find_duplicate_groups 모두 분리 실행 |
quality_controller |
get_cached_embeddings_parallel , get_low_quality_images 모두 분리 실행 |
highlight_scoring_controller |
get_cached_embeddings_parallel + score_each_category() 함수 분리 실행 |
people_controller |
얼굴 인식 및 클러스터링 로직 전체를 run_in_executor() 로 분리 |
b. 결과 요약
- 모든 컨트롤러는 이벤트 루프에 영향을 주지 않고,Heavy 연산은 백그라운드 스레드에서 안전하게 실행됨
- API 응답성 유지 + 서버 안정성 확보
- 이후 Celery 등 메시지 큐로의 마이그레이션 시에도 구조적으로 호환성 확보됨
c. 이후 개선 여지
image_loader.load_images()
와 같이 실제 I/O를 수반하는 부분도aiofiles
등으로 비동기화 고려- 각
service/
모듈 내에서 공통 헬퍼 (submit_cpu_task(fn, *args)
) 만들어 코드 중복 제거 가능