비동기 구조 개선 - 100-hours-a-week/5-yeosa-wiki GitHub Wiki

1. 개요

  • fastapi 비동기 방식 공부를 바탕으로 기존 코드에 병목 지점들을 개선하고자 함.

  • 이미지 로더 기존 코드

    import os, requests
    from abc import ABC, abstractmethod
    from concurrent.futures import ThreadPoolExecutor
    from io import BytesIO
    
    import aioboto3
    from dotenv import load_dotenv
    from google.cloud import storage
    from PIL import Image
    from starlette.concurrency import run_in_threadpool
    
    from app.config.settings import ImageMode, APP_ENV, AppEnv
    
    load_dotenv()
    
    # local
    LOCAL_IMG_PATH_raw = os.getenv("LOCAL_IMG_PATH")
    
    # GCS
    GCS_BUCKET_NAME_raw = os.getenv("GCS_BUCKET_NAME")
    GCP_KEY_PATH_raw = os.getenv("GCP_KEY_PATH")
    
    # S3
    S3_BUCKET_NAME_raw = os.getenv("S3_BUCKET_NAME")
    AWS_ACCESS_KEY_ID_raw = os.getenv("AWS_ACCESS_KEY")
    AWS_SECRET_ACCESS_KEY_raw = os.getenv("AWS_SECRET_KEY")
    AWS_REGION_raw = os.getenv("AWS_REGION")
    
    # local
    if LOCAL_IMG_PATH_raw is None:
        raise EnvironmentError("LOCAL_IMG_PATH은 .env에 설정되어야 합니다.")
    
    # GCS
    if GCS_BUCKET_NAME_raw is None:
        raise EnvironmentError("BUCKET_NAME은 .env에 설정되어야 합니다.")
    if GCP_KEY_PATH_raw is None:
        raise EnvironmentError("GCP_KEY_PATH은 .env에 설정되어야 합니다.")
    
    # S3
    if S3_BUCKET_NAME_raw is None:
        raise EnvironmentError("S3_BUCKET_NAME은 .env에 설정되어야 합니다.")
    if AWS_ACCESS_KEY_ID_raw is None or AWS_SECRET_ACCESS_KEY_raw is None:
        raise EnvironmentError("AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY는 .env에 설정되어야 합니다.")
    if AWS_REGION_raw is None:
        raise EnvironmentError("AWS_REGION은 .env에 설정되어야 합니다.")
    
    # 타입이 str로 확정됨 (mypy 추론 가능)
    LOCAL_IMG_PATH: str = LOCAL_IMG_PATH_raw
    
    GCS_BUCKET_NAME: str = GCS_BUCKET_NAME_raw
    GCP_KEY_PATH: str = GCP_KEY_PATH_raw
    
    S3_BUCKET_NAME: str = S3_BUCKET_NAME_raw
    AWS_ACCESS_KEY_ID: str = AWS_ACCESS_KEY_ID_raw
    AWS_SECRET_ACCESS_KEY: str = AWS_SECRET_ACCESS_KEY_raw
    AWS_REGION: str = AWS_REGION_raw
    
    class BaseImageLoader(ABC):
        """
        이미지 로더의 추상 베이스 클래스.
    
        모든 이미지 로더는 `load_images` 메서드를 구현해야 합니다.
        """
    
        @abstractmethod
        async def load_images(self, filenames: list[str]) -> list[Image.Image]:
            """
            주어진 이미지 파일 이름 리스트에 대해 이미지를 로드합니다.
    
            Args:
                filenames (list[str]): 로드할 이미지 파일 이름 목록
    
            Returns:
                list[Image.Image]: 로드된 PIL 이미지 객체 리스트
    
            """
            pass
    
    class LocalImageLoader(BaseImageLoader):
        """로컬 파일 시스템에서 이미지를 로드하는 클래스입니다."""
    
        def __init__(self, image_dir: str = LOCAL_IMG_PATH):
            """
            Args:
                image_dir (str): 이미지가 저장된 로컬 디렉토리 경로
    
            """
            self.image_dir = image_dir
    
        async def load_images(self, filenames: list[str]) -> list[Image.Image]:
            """
            비동기적으로 로컬 이미지들을 로드합니다.
    
            Args:
                filenames (list[str]): 이미지 파일 이름 리스트
    
            Returns:
                list[Image.Image]: 로드된 이미지 리스트
    
            """
            return await run_in_threadpool(
                lambda: list(
                    Image.open(os.path.join(self.image_dir, filename)).convert(
                        "RGB"
                    )
                    for filename in filenames
                )
            )
    
    class GCSImageLoader(BaseImageLoader):
        """Google Cloud Storage(GCS)에서 이미지를 로드하는 클래스입니다."""
    
        def __init__(
            self, bucket_name: str = GCS_BUCKET_NAME, key_path: str = GCP_KEY_PATH
        ):
            """
            Args:
                bucket_name (str): GCS 버킷 이름
                key_path (str): 서비스 계정 키 경로 (.json)
    
            """
            self.client = storage.Client.from_service_account_json(key_path)
            self.bucket = self.client.bucket(bucket_name)
            self.executor = ThreadPoolExecutor(max_workers=10)
    
        def _download(self, file_name: str) -> Image.Image:
            """
            GCS에서 단일 이미지를 다운로드하고 RGB로 변환합니다.
    
            Args:
                file_name (str): GCS 내 파일 이름
    
            Returns:
                Image.Image: 로드된 PIL 이미지
    
            """
            
            
            blob = self.bucket.blob(file_name)
            image_bytes = blob.download_as_bytes()
            return Image.open(BytesIO(image_bytes)).convert("RGB")
    
        async def load_images(self, filenames: list[str]) -> list[Image.Image]:
            """
            비동기적으로 GCS에서 이미지를 병렬로 다운로드합니다.
    
            Args:
                filenames (list[str]): GCS 상의 이미지 파일 이름 리스트
    
            Returns:
                list[Image.Image]: 로드된 이미지 리스트
    
            """
            return await run_in_threadpool(
                lambda: list(self.executor.map(self._download, filenames))
            )
    
    class S3ImageLoader(BaseImageLoader):
        """Amazon S3에서 이미지를 로드하는 클래스입니다."""
    
        def __init__(
            self,
            bucket_name: str = S3_BUCKET_NAME,
            aws_access_key_id: str = AWS_ACCESS_KEY_ID,
            aws_secret_access_key: str = AWS_SECRET_ACCESS_KEY,
            region_name: str = AWS_REGION,
        ):
            """
            Args:
                bucket_name (str): S3 버킷 이름
                aws_access_key_id (str): AWS 액세스 키 ID
                aws_secret_access_key (str): AWS 시크릿 액세스 키
                region_name (str): S3 버킷의 리전 이름
    
            """
            self.bucket_name = bucket_name
            self.s3 = boto3.client(
                "s3",
                aws_access_key_id=aws_access_key_id,
                aws_secret_access_key=aws_secret_access_key,
                region_name=region_name,
            )
            self.executor = ThreadPoolExecutor(max_workers=10)
    
        def _download(self, file_ref: str) -> Image.Image:
            """
            S3에서 단일 이미지를 다운로드하고 RGB로 변환합니다.
    
            Args:
                file_ref (str): S3 내 파일 이름 (key)
    
            Returns:
                Image.Image: 로드된 PIL 이미지 객체
    
            """
            if APP_ENV == AppEnv.PROD:
                try:
                    response = requests.get(file_ref, timeout=5)
                    response.raise_for_status()
                    return Image.open(BytesIO(response.content)).convert("RGB")
                except Exception as e:
                    raise RuntimeError(f"[S3ImageLoader:prod] 이미지 다운로드 실패 URL: {file_ref}") from e
            else:
                response = self.s3.get_object(Bucket=self.bucket_name, Key=file_ref)
                image_bytes = response["Body"].read()
                return Image.open(BytesIO(image_bytes)).convert("RGB")
        
    
        async def load_images(self, filenames: list[str]) -> list[Image.Image]:
            """
            비동기적으로 S3에서 이미지를 병렬로 다운로드합니다.
    
            Args:
                filenames (list[str]): S3 내 이미지 파일 이름 리스트
    
            Returns:
                list[Image.Image]: 로드된 PIL 이미지 리스트
    
            """
            return await run_in_threadpool(
                lambda: list(self.executor.map(self._download, filenames))
            )
    
    def get_image_loader(mode: ImageMode) -> BaseImageLoader:
        """
        이미지 로딩 모드를 기반으로 적절한 이미지 로더 인스턴스를 반환합니다.
    
        Args:
            mode (ImageMode): 이미지 로딩 방식 (로컬 or GCS or S3)
    
        Returns:
            BaseImageLoader: 선택된 이미지 로더 인스턴스
    
        """
        if mode == ImageMode.GCS:
            return GCSImageLoader()
        elif mode == ImageMode.S3:
            return S3ImageLoader()
        return LocalImageLoader()
    
    

2. 전반적인 병목 지점 식별 및 개선점

a. 이미지 로더

  • turboJEPG를 이용한 방식으로 최적화 진행해두었으나, 현재 서버에서도 openCV 사용해서 최대한 최적화가 필요.
    • PIL 이용 → openCV 변경
    • 이중 스레드풀 구조 개선
    • url 통한 로딩 aiohttp로 변경
    • boto3 & google-cloud-storage → aioboto3 & gcloud-aio-storage 변경

b. 라우터에서 시리얼라이즈 큐 이용

image

c. 모든 컨트롤러에서 request.app.state.loop 이용

  • 스레드 풀이 충분한지 세마포어나 개별 풀 구성이 필요한지 식별

3. 세부적인 병목 지점 식별 및 개선점

a. 이미지 로더 병목 지점

  • 세마포어 위치: 기존에 다운로드 부분에 세마포어가 걸려 있었음 그렇기 때문에 디코딩 작업에서 과도하게 많은 스레드가 이용되어 cpu 사용률이 크게 뛰었을 것으로 예상
  • PIL 이용: 하이브리드 라이브러리로 python과 c 코드가 혼합되어 있으나, 이미지 로더에서 사용하는 Image.open은 python 구현으로 GIL 걸림
  • url 기반 이미지 로딩 시, requests 이용: I/o 작업이라 이벤트 루프에서 실행되도록 구현했는데 requests는 응답이 올때까지 스레드를 블로킹함
  • 이중 스레드 활용: 비효율적인 이중 스레드 구조. 람다 함수를 워커 스레드에서 실행시키는데 워커 스레드 내부에서 다시 워커 스레드에 다운로드 작업을 맡김
  • 동기 sdk 이용: boto3, google-cloud-storage는 동기 sdk로 스레드 블로킹

b. 이미지 로더 개선 사항

  • 세마포어 위치를 디코딩 작업으로 변경: cpu 리소스 사용률이 안정화될 것으로 기대

  • 이미지 디코딩 openCV로 변경: 기존에 Image.open을 한 이유는 RGB 변환을 위해서 filelike 객체를 만들고 열어서 이용하는 과정이 필요했기 때문. openCV를 이용하면 filelike 객체가 필요하지 않기 때문에 읽어온 바이트 그대로 디코딩 가능. 또한 openCV의 디코딩과 색공간 변환은 c++로 구현되어 있어 GIL의 영향을 받지 않음

  • aiohttp로 변경: 메인 스레드를 블로킹하지 않는 비동기 통신 방식 이용 → aiohttp는 presigned url을 먼저 발급받아야 한다. 이 때문에 추가적인 오버헤드가 예상된다

    • requests(동기)를 run_in_executor에서 수행하는 것과 presigned_url 발급을 run_in_executor에서 하고 aiohttp는 이벤트 루프에서 하는 방식 비교

      구분 방식 A 방식 B
      이름 requests.get() in run_in_executor() generate_presigned_url() in run_in_executor() + aiohttp.get()
      다운로드 동기 방식 (requests) 비동기 방식 (aiohttp)
      URL 생성 없음 별도 presigned URL 발급
      이벤트 루프 사용 ❌ 없음 (executor 사용) aiohttp는 이벤트 루프에서 실행
      오버헤드 requests 호출 자체가 무겁고 비효율적 presigned URL 발급 오버헤드만 있음 (경량)
      • requests 작업은 무거움. run_in_executor로 실행하더라도 실제 병렬성이 아님. 각각의 스레드가 요청 처리에 걸리는 시간이 더 길기 때문에 더 많은 context switching 발생

        → 문제점: url로는 presigned url을 발급 받을 수 없다. url에서 이미지명(key)를 파싱해서 이용해야 함

  • 이중 스레드 개선: 메인 스레드에서 디코딩 작업에 대해 워커 스레드에 바로 할당하도록 변경

  • 비동기 sdk 이용: 파일명을 통한 이미지 로딩에서 비동기 sdk를 이용함으로써 스레드 블로킹을 막음

c. 변경된 이미지 로더 작업 구조

async def load_images(self, filenames: list[str]) -> list[np.ndarray]:
    tasks = [self._process_single_file(f) for f in filenames]
    result = await asyncio.gather(*tasks)
    return result
  • 단일 파일 처리 코루틴 객체를 모아서 이벤트 루프에서 병렬 실행한다.
async def _process_single_file(self, filename: str) -> np.ndarray:
    loop = asyncio.get_running_loop()
    image_bytes = await self._download(filename)
    async with decode_semaphore:
        decoded = await loop.run_in_executor(
            None, decode_image_cv2, image_bytes, "s3"
        )
    return decoded
  • 다운로드는 이벤트루프에서 실행, 디코딩은 cpu 바운드 작업이기 때문에 스레드풀에 작업 할당하고 await. 이때 cpu 사용량 제한을 위해 세마포어 이용

d. 라우터 병목 지점

@router.post("", status_code=201)
@log_flow
async def embed(req: ImageRequest, request: Request):
    return await request.app.state.embedding_queue.enqueue(
        lambda: embed_controller(req, request)
    )
  • 큐를 제외한다. 코루틴 라우터를 통해 요청이 들어간다. 이미지 로더에서 로딩을 await한다. 임베딩은 워커 스레드에서 수행된다.
  • 다만, 요청이 들어올 때마다 워커 스레드를 이용하면 너무 많은 스레드를 사용할 수 있다. 세마포어로 적절히 제한해주는게 필요하다.

e. 라우터 개선 사항

@router.post("", status_code=201)
@log_flow
async def embed(req: ImageRequest, request: Request):
    return await embed_controller(req, request)
  • 이벤트 루프에서 컨트롤러를 실행한다.
@log_flow
async def embed_controller(req: ImageRequest, request: Request) -> JSONResponse:
		...
    images = await image_loader.load_images(image_refs)
		...
    async with embedding_semaphore:
        await loop.run_in_executor(None, task_func)
  • 이미지 로드를 이벤트 루프에서 await한다. 임베딩 작업에서 이용하는 스레드 수를 제한하기 위해 세마포어를 획득한 요청들이 스레드풀에 작업을 할당할 수 있게 한다.