Multi Stream 도입 테스트 - 100-hours-a-week/5-yeosa-wiki GitHub Wiki

1. 테스트 개요

a. 목표

  • GPU의 병렬 커널 실행 능력을 활용하여 이미지 임베딩 처리 성능을 최적화하기 위해 CUDA 스트림을 분리하여 전처리부터 추론까지 병렬로 실행할 수 있는 구조를 테스트

b. 왜 이 테스트가 필요한가?

목적 설명
성능 향상 default stream 하나에 모든 요청이 직렬로 쌓이면 GPU가 유휴 상태로 남는 시간 발생 → 처리 속도 저하
자원 활용 최적화 여러 요청이 동시에 GPU를 사용하고자 할 때 커널을 서로 다른 스트림에 분배하면 병렬 실행 가능
메모리 안전성 확보 세마포어를 통해 동시에 실행될 요청 수를 제한하여 VRAM 과부하 방지
추론 파이프라인 단일화 전처리와 추론을 하나의 스트림에서 실행하면 명시적 동기화 없이 안정적이고 예측 가능한 연산 흐름 확보

2. 테스트를 위해 변경한 코드 요약

a. StreamManager 도입

  • app/core/stream_manager.pyStreamManager 클래스 정의

    import torch
    from itertools import cycle
    from typing import List
    
    class StreamManger:
        """
        StreamManager는 주어진 스트림을 순환하는 기능을 제공합니다.
        
        Attributes:
            num_streams (int): 순환할 스트림의 개수
            streams (List): 순환할 스트림의 리스트
            stream_cycle (cycle): 무한히 반복되는 스트림 생성기
            
        """
        
        def __init__(self, num_streams: int = 4):
            """
            초기화 메서드입니다.
            
            Args:
                num_streams (int): 순환할 스트림의 개수
            """
            self.num_streams = num_streams
            self.streams: List[torch.cuda.Stream] = [torch.cuda.Stream() for _ in range(num_streams)]
            self.stream_cycle = cycle(streams)
            
            def get_stream(self) -> torch.cuda.Stream:
                """
                다음에 사용될 스트림을 반환합니다.
                
                Returns:
                    torch.cuda.Stream: 다음에 사용될 스트림
                    
                """
                return next(self.stream_cycle)
    
  • torch.cuda.Stream() 객체들을 생성하여 라운드로빈 방식으로 분배

  • FastAPI app.state.stream_manager로 lifespan 초기화하여 공유

    NUM_STREAMS = 1
    
    app.state.stream_manager = StreamManager(NUM_STREAMS)
    

b. 임베딩을 수행하기 위해 GPU를 사용하는 함수(preprocess, embed_image)에서 각각 stream을 받도록 수정

  • preprocess → stream을 지정해주면 stream에서 preprocess 처리하도록 수정

    import cv2
    import numpy as np
    import torch
    import contextlib
    
    def get_clip_preprocess_fn(device: str) -> callable:
        """
        CLIP 전처리 함수 반환.
        GPU 사용 시 GPU 텐서로 변환.
        """
        def clip_preprocess_np(img: np.ndarray, stream: torch.cuda.Stream = None) -> torch.Tensor:
            img = cv2.resize(img, (224, 224), interpolation=cv2.INTER_CUBIC)
            mean = torch.tensor([0.48145466, 0.4578275, 0.40821073], device=device).view(3, 1, 1)
            std = torch.tensor([0.26862954, 0.26130258, 0.27577711], device=device).view(3, 1, 1)
            with torch.cuda.stream(stream) if stream else contextlib.nullcontext():
                tensor = torch.from_numpy(img).permute(2, 0, 1).float().to(device) / 255.0
                return (tensor - mean) / std
        return clip_preprocess_np
    
    • contextlib.nullcontext()

      • contextlib.nullcontext()”아무 동작도 하지 않는 with문을 만들기 위한 도구”
        • 즉, 마치 with문이 없는 것처럼 행동하게 해주는 가짜 context manager

      [ 목적: "with 문은 유지하되, 아무 것도 하지 마라" ]

      • 예시

        with contextlib.nullcontext():
            print("그냥 이 줄만 실행됩니다.")
        

        → 결과적으로는 그냥 이 줄만 실행되며, 들어가기 전이나 나올 때 아무 동작도 하지 않음 (enter/exit 안에 아무 코드도 없는 context manager라고 생각하면 됨)

      [ 어떤 상황에서 쓰는가? ]

      • ifwith문을 나눠서 중복 코드가 생길 때, 이를 깔끔하게 통합하기 위해 사용

        • 비효율적 방식

          if stream is not None:
              with torch.cuda.stream(stream):
                  result = model(tensor)
          else:
              result = model(tensor)
          
        • 개선 방식 (nullcontext 사용)

          with torch.cuda.stream(stream) if stream else contextlib.nullcontext():
              result = model(tensor)
          

        → 두 줄을 하나로 통합해서 더 깔끔하고 유지보수 쉬운 코드로 만듦

      [ 결론 요약 ]

      질문 답변
      nullcontext()는 무엇인가? 아무 것도 하지 않는 context manager
      언제 쓰는가? 조건적으로 with문을 쓸 때, 코드 중복 없이 깔끔하게 처리하려고
      with nullcontext():는 무슨 뜻인가? 마치 with문이 없는 것처럼 해당 블록을 그냥 실행시켜 줌

  • embed_image → stream을 인자로 받아 preprocess, encode_image를 모두 주어진 stream에서 수행하도록 수정

    import contextlib
    
    import torch
    from concurrent.futures import ThreadPoolExecutor
    
    def embed_images(
        model, preprocess, images, filenames, batch_size=32, device="cuda", stream=None
    ):
        # 이미지 전처리를 배치 단위로 수행
        preprocessed_batches = []
        
        # ThreadPoolExecutor 생성
        with ThreadPoolExecutor() as executor:
            # 전처리 함수를 lambda로 정의
            preprocess_func = lambda img: preprocess(img, stream=stream)
            
            for i in range(0, len(images), batch_size):
                batch_images = images[i:i + batch_size]
                # 병렬로 전처리 수행
                preprocessed_batch = list(executor.map(preprocess_func, batch_images))
                preprocessed_batch = torch.stack(preprocessed_batch)
                preprocessed_batches.append(preprocessed_batch)
    
        # 결과를 저장할 딕셔너리
        results = {}
        
        # 배치 단위로 임베딩 수행
        for i, batch in enumerate(preprocessed_batches):
            batch_filenames = filenames[i * batch_size:(i + 1) * batch_size]
            
            # GPU로 데이터 이동
            image_input = batch.to(device)
            
            # 스트림이 제공된 경우, 비동기적으로 실행
            with torch.cuda.stream(stream) if stream else contextlib.nullcontext():
                # 임베딩 생성
                with torch.no_grad():
                    batch_features = model.encode_image(image_input)
                    
            # CPU로 결과 이동 및 저장
            batch_features = batch_features.cpu()
            for filename, feature in zip(batch_filenames, batch_features):
                results[filename] = feature.numpy().tolist()  # numpy 배열을 리스트로 변환
    
        return results
    
    

c. 임베딩 컨트롤러에서, embed_image를 호출할 때 stream을 전달

  • embed_controller에서 stream = request.app.state.stream_manager.get_stream() 호출
  • partial(embed_images, ..., stream=stream)로 전달
task_func = partial(
    embed_images,
    clip_model,
    clip_preprocess,
    images,
    image_refs,
    batch_size=32,
    device="cuda",
    stream=stream,
)

3. 테스트 시나리오

  • 부하 도구: k6
  • 사용자 시뮬레이션: 동시 사용자(VU) 최대 30명
  • 실행 시간: 3분간 유지
  • 요청 주기: 각 사용자는 응답 수신 후 3~5초 랜덤 휴식
  • 요청 데이터: 30~50장의 이미지 리스트 포함된 embedding 요청

4. 테스트 결과

a. 결과 요약

항목 스트림 1 / 세마포어 4 스트림 4 / 세마포어 8
평균 임베딩 시간 (embedding_duration) 3099ms 2958ms
평균 HTTP 응답 시간 (http_req_duration) 3.09s 2.95s
평균 전체 iteration 시간 (iteration_duration) 7.08s 6.94s
GPU 사용률 ~50% ~50%
GPU vRAM 사용량 11.86% (~1.9GB) 13.66% (~2.2GB)
CPU 사용률 최대 90% 최대 90%

b. 성능 차이 해석

가. 스트림 4가 스트림 1보다 소폭 성능이 좋음

항목 해석
avg embedding time ↓ 병렬 커널이 일부라도 병렬 처리된 것으로 보임
HTTP latency ↓ 전체 응답 시간도 소폭 개선
vRAM 사용량 ↑ 병렬로 여러 텐서가 vRAM에 올라간 증거

즉, stream 4는 실제로 일부 병렬화를 해내긴 했다는 의미

나. GPU 사용률은 여전히 낮다 (둘 다 40%)

이건 병렬 커널이 GPU를 꽉 채우지 못한다는 것을 의미

원인 해석
커널이 너무 작거나 짧음 T4에서 stream 하나당 사용하는 SM 수가 작음
데이터 전송 병목 .to(device)가 default stream에서 병렬화 실패
CPU 병목 가능성 CPU 사용률이 매우 높음 (90%)
커널 간 컨텍스트 스위칭 비용 stream 수 증가로 전환 비용이 발생했을 수도

→ 요약하면: 멀티 스트림을 도입했지만 GPU 병렬성이 제한적이고, CPU가 병목인 상태

다. CPU 사용률은 90%로 포화

  • image_loader.load_images() 또는 preprocess() (OpenCV, NumPy) 단계가 CPU-intensive
  • ThreadPoolExecutor + B=32 이미지 처리 → CPU 스레드가 과부하
  • → GPU는 놀고 있고, CPU는 바쁨 → “데이터가 안 와서 GPU가 놀고 있음” 상태
    • GPU 사용률은 낮은데 latency는 유지되는 현상의 본질적인 원인으로 추정

라. 전체 구조 관점에서 분석 요약

측면 진단 요약
GPU 병렬화 일부 성공, 하지만 여전히 활용도 낮음 SM 과소 활용 + 메모리 병목
CPU 병목의 핵심 전처리 + 디코딩 병렬화에서 과부하 발생
스트림 4 vs 스트림 1 stream 4가 근소하게 더 빠름 병렬 커널 일부 성공
세마포어 수 증가 vRAM 사용량 증가로 확인됨 병렬 요청 증가 확인됨
전반적 병목 위치 GPU 아님 → CPU, H2D 전송 즉, “GPU를 쓰는 것 자체가 병목이 아님”

5. 다음 테스트 제안

a. 지금 시점에서는 stream 수를 더 늘려봐야 큰 효과 없다

  • 이미 CPU 병목 → GPU가 놀고 있음
  • stream 수를 늘려봐야 더 많은 커널을 던질 CPU 여력이 없음