[AI] 03_ADR_ 016‐020 - 100-hours-a-week/9-team-Devths-WIKI GitHub Wiki
기술 의사결정 기록 (ADR) - 016~020
Architecture Decision Records - 프로젝트에서 기술 선택의 근거를 기록하는 문서
📚 목차
- ADR-016: LLM 모니터링 도구 선정 (LangFuse vs LangSmith)
- ADR-017: FastAPI 서버 분리 전략 (모델 서버 vs 서비스 서버)
- ADR-018: 이벤트 스트리밍 전략 (Kafka vs Celery Beat vs Airflow)
- ADR-019: LLM 출력 구조화 - Pydantic AI 미사용
- ADR-020: RAG Reranker 도입 (Cohere Rerank)
ADR-016: LLM 모니터링 도구 선정 (LangFuse vs LangSmith)
📋 메타데이터
| 항목 | 내용 |
|---|---|
| 상태 | ✅ 승인됨 (Accepted) |
| 작성일 | 2026-01-12 |
| 결정자 | AI팀 |
| 관련 기능 | LLM 모니터링, 디버깅, 성능 추적 |
🎯 컨텍스트 (Context)
LangChain 기반 AI 서비스에서 다음을 모니터링해야 합니다:
- LLM 호출 추적: 요청/응답, 토큰 사용량, 비용
- 체인 실행 흐름: LangChain/LangGraph 실행 과정
- 에러 추적: 실패 원인, Fallback 발생 여부
- 성능 분석: 응답 시간, 병목 지점
문제점:
- LangChain 내부 동작이 블랙박스
- 비용 추적 어려움 (Gemini, OpenAI 혼용)
- 디버깅 시 로그만으로는 부족
- 프로덕션 환경에서 실시간 모니터링 필요
선택지:
- LangSmith: LangChain 공식 모니터링 도구 (유료)
- LangFuse: 오픈소스 대안 (무료/유료)
🔍 선택지 분석 (Options)
Option 1: LangSmith (유료)
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "ai-chatbot"
from langchain_google_genai import ChatGoogleGenerativeAI
llm = ChatGoogleGenerativeAI(model="gemini-3-flash")
result = llm.invoke("안녕하세요") # 자동으로 LangSmith에 추적됨
특징:
| 항목 | 내용 |
|---|---|
| 제공사 | LangChain (공식) |
| 가격 | 무료 티어: 5,000 traces/월Developer: $39/월 (50,000 traces)Plus: $199/월 (500,000 traces) |
| 통합 | LangChain 네이티브 통합 |
| UI | 전문적, 직관적 |
| 기능 | Trace, Playground, Datasets, Evaluation |
장점:
- ✅ LangChain 공식 도구 (완벽한 통합)
- ✅ 설정 매우 간단 (환경변수만 설정)
- ✅ 자동 추적 (코드 수정 최소)
- ✅ Playground (프롬프트 테스트)
- ✅ Datasets (평가 데이터셋 관리)
단점:
- ❌ 유료 (무료 티어 제한적)
- ❌ 데이터 외부 전송 (프라이버시 이슈)
- ❌ 벤더 락인
Option 2: LangFuse (오픈소스)
from langfuse.callback import CallbackHandler
langfuse_handler = CallbackHandler(
public_key="your-public-key",
secret_key="your-secret-key",
host="https://cloud.langfuse.com" # 또는 self-hosted
)
from langchain_google_genai import ChatGoogleGenerativeAI
llm = ChatGoogleGenerativeAI(model="gemini-3-flash")
result = llm.invoke(
"안녕하세요",
config={"callbacks": [langfuse_handler]} # 명시적 설정 필요
)
특징:
| 항목 | 내용 |
|---|---|
| 제공사 | Langfuse (오픈소스) |
| 가격 | 무료 (Self-hosted)Cloud: $59/월 (100,000 traces)Pro: $299/월 (1,000,000 traces) |
| 통합 | LangChain 지원 (Callback) |
| UI | 오픈소스, 커스터마이징 가능 |
| 기능 | Trace, Prompt Management, Datasets, Evaluation |
장점:
- ✅ 오픈소스 (Self-hosted 가능)
- ✅ 데이터 자체 관리 (프라이버시)
- ✅ 무료 (Self-hosted)
- ✅ Prompt Management (버전 관리)
- ✅ 벤더 락인 없음
단점:
- ❌ 설정 복잡 (Callback 명시 필요)
- ❌ Self-hosted 시 인프라 관리 필요
- ❌ LangChain 통합 완성도 낮음
Option 3: 둘 다 사용 (하이브리드)
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "langsmith-key"
from langfuse.callback import CallbackHandler
langfuse_handler = CallbackHandler(
public_key="langfuse-public-key",
secret_key="langfuse-secret-key"
)
from langchain_google_genai import ChatGoogleGenerativeAI
llm = ChatGoogleGenerativeAI(model="gemini-3-flash")
result = llm.invoke(
"안녕하세요",
config={"callbacks": [langfuse_handler]} # LangSmith + LangFuse
)
장점:
- ✅ LangSmith: 개발/디버깅 (빠른 피드백)
- ✅ LangFuse: 프로덕션 모니터링 (데이터 자체 관리)
단점:
- ❌ 비용 증가 (두 서비스 모두 유료)
- ❌ 관리 복잡도 증가
- ❌ 데이터 중복
✅ 결정 (Decision)
단계적 도입: LangSmith (V1~V3) → LangFuse 추가 (V4)
| 버전 | 도구 | 이유 |
|---|---|---|
| V1 (MVP) | ❌ 모니터링 없음 | 빠른 출시 우선 |
| V2 | LangSmith (무료 티어) | 개발/디버깅, 빠른 통합 |
| V3 | LangSmith (Developer $39/월) | 프로덕션 모니터링 |
| V4 | LangSmith + LangFuse (선택) | 데이터 자체 관리 필요 시 |
📝 근거 (Rationale)
1. V2~V3: LangSmith 우선 선택 이유
- ✅ 빠른 통합: 환경변수만 설정하면 자동 추적
- ✅ 공식 지원: LangChain 네이티브 통합, 버그 적음
- ✅ 무료 티어: 5,000 traces/월 (V2 충분)
- ✅ Playground: 프롬프트 테스트 및 최적화
- ✅ 학습 곡선: 문서화 우수, 커뮤니티 활발
2. V4: LangFuse 추가 검토 시점
다음 조건 중 2개 이상 충족 시 LangFuse 추가:
- ⚠️ LangSmith 비용이 $200/월 초과
- ⚠️ 데이터 프라이버시 요구사항 강화
- ⚠️ Prompt 버전 관리 필요 (LangFuse 강점)
- ⚠️ Self-hosted 인프라 구축 완료
3. 둘 다 사용하지 않는 이유 (V2~V3)
- ❌ 비용 증가 (LangSmith $39 + LangFuse $59 = $98/월)
- ❌ 관리 복잡도 (두 대시보드 관리)
- ❌ MVP 단계에서 과도한 투자
📊 비용 비교
LangSmith 가격
| 플랜 | 가격 | Traces/월 | 적합 단계 |
|---|---|---|---|
| Free | $0 | 5,000 | V2 (개발) |
| Developer | $39 | 50,000 | V3 (초기 프로덕션) |
| Plus | $199 | 500,000 | V4+ (성장) |
LangFuse 가격
| 플랜 | 가격 | Traces/월 | 적합 단계 |
|---|---|---|---|
| Self-hosted | $0 | 무제한 | V4+ (인프라 있을 시) |
| Cloud | $59 | 100,000 | V3~V4 |
| Pro | $299 | 1,000,000 | V4+ (대규모) |
🔧 구현 예시
V2~V3: LangSmith만 사용
# app/config.py
import os
# LangSmith 설정
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = os.getenv("LANGSMITH_API_KEY")
os.environ["LANGCHAIN_PROJECT"] = "ai-chatbot-v2"
# app/services/llm.py
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_openai import ChatOpenAI
# LangSmith 자동 추적 (별도 설정 불필요)
gemini_flash = ChatGoogleGenerativeAI(model="gemini-3-flash")
gpt5_mini = ChatOpenAI(model="gpt-5-mini")
chat_llm = gemini_flash.with_fallbacks([gpt5_mini])
# 사용
result = chat_llm.invoke("안녕하세요")
# → LangSmith에 자동 기록됨
LangSmith 대시보드에서 확인 가능:
- ✅ 요청/응답 내용
- ✅ 토큰 사용량
- ✅ 비용 ($)
- ✅ Fallback 발생 여부
- ✅ 응답 시간
V4: LangFuse 추가 (선택)
# app/config.py
import os
from langfuse.callback import CallbackHandler
# LangSmith (개발/디버깅)
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = os.getenv("LANGSMITH_API_KEY")
# LangFuse (프로덕션 모니터링)
langfuse_handler = CallbackHandler(
public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
host="https://cloud.langfuse.com"
)
# app/services/llm.py
from langchain_google_genai import ChatGoogleGenerativeAI
gemini_flash = ChatGoogleGenerativeAI(model="gemini-3-flash")
# 프로덕션: LangFuse 추가
result = gemini_flash.invoke(
"안녕하세요",
config={"callbacks": [langfuse_handler]}
)
# → LangSmith + LangFuse 둘 다 기록
📊 기능 비교
| 기능 | LangSmith | LangFuse |
|---|---|---|
| LangChain 통합 | ✅ 네이티브 | ⚠️ Callback |
| 자동 추적 | ✅ | ❌ (명시 필요) |
| Playground | ✅ | ❌ |
| Prompt 버전 관리 | ⚠️ 기본 | ✅ 강력 |
| Datasets | ✅ | ✅ |
| Evaluation | ✅ | ✅ |
| Self-hosted | ❌ | ✅ |
| 무료 티어 | 5,000 traces | ❌ (Self-hosted만) |
| 가격 (Cloud) | $39/월 (50K) | $59/월 (100K) |
🎯 최종 추천
V2 (개발)
LangSmith 무료 티어
- 5,000 traces/월
- 개발/디버깅
V3 (초기 프로덕션)
LangSmith Developer ($39/월)
- 50,000 traces/월
- 프로덕션 모니터링
V4 (성장)
옵션 1: LangSmith Plus ($199/월)
- 500,000 traces/월
- 단일 도구로 관리
옵션 2: LangSmith + LangFuse Self-hosted
- LangSmith: 개발/디버깅
- LangFuse: 프로덕션 (데이터 자체 관리)
📅 이력
| 날짜 | 변경 내용 |
|---|---|
| 2026-01-12 | 초기 결정 및 문서 작성 (LangSmith 우선, LangFuse는 V4 검토) |
ADR-017: FastAPI 서버 분리 전략 (모델 서버 vs 서비스 서버)
📋 메타데이터
| 항목 | 내용 |
|---|---|
| 상태 | ✅ 승인됨 (Accepted) |
| 작성일 | 2026-01-12 |
| 결정자 | AI/백엔드팀 |
| 관련 기능 | AI 서버 아키텍처, 모델 서빙, API 설계 |
🎯 컨텍스트 (Context)
AI 서비스에서 다음 두 가지 역할을 수행해야 합니다:
- 모델 서빙: LLM, Embedding, YOLO, CLOVA OCR 등 AI 모델 호출
- 비즈니스 로직: 인증, 데이터 검증, DB 조회, 결과 가공
문제점:
- 단일 서버에서 모든 역할 수행 시 리소스 경합
- AI 모델 호출은 GPU/메모리 집약적
- 비즈니스 로직은 CPU 집약적
- 스케일링 전략이 다름 (모델 vs 로직)
고려사항:
- 리소스 효율: GPU는 모델 서버에만 할당
- 스케일링: 모델 서버와 서비스 서버 독립적 확장
- 장애 격리: 모델 서버 장애 시 서비스 서버는 정상 동작
- 개발 효율: 서버 분리 시 관리 복잡도 증가
🔍 선택지 분석 (Options)
Option 1: 단일 FastAPI 서버 (Monolithic)
┌─────────────────────────────────────────────────────────┐
│ FastAPI 서버 (단일) │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─ API 라우터 ─────────────────────────────────────┐ │
│ │ /ai/chat │ │
│ │ /ai/analyze │ │
│ │ /ai/interview/question │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ ┌─ 비즈니스 로직 ──────────────────────────────────┐ │
│ │ - 인증/인가 │ │
│ │ - 데이터 검증 │ │
│ │ - DB 조회 │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ ┌─ AI 모델 호출 ───────────────────────────────────┐ │
│ │ - LLM (Gemini, OpenAI) │ │
│ │ - Embedding (Gemini API) │ │
│ │ - CLOVA OCR, YOLO │ │
│ └──────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
| 장점 | 단점 |
|---|---|
| 단순한 구조 | 리소스 경합 (GPU vs CPU) |
| 배포 간편 | 스케일링 비효율 |
| 디버깅 용이 | 장애 전파 위험 |
Option 2: 서버 분리 (모델 서버 + 서비스 서버) ⭐
┌─────────────────────────────────────────────────────────┐
│ 서비스 서버 (FastAPI) │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─ API 라우터 ─────────────────────────────────────┐ │
│ │ /ai/chat │ │
│ │ /ai/analyze │ │
│ │ /ai/interview/question │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ ┌─ 비즈니스 로직 ──────────────────────────────────┐ │
│ │ - 인증/인가 │ │
│ │ - 데이터 검증 │ │
│ │ - DB 조회 │ │
│ │ - 결과 가공 │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ ┌─ 모델 서버 호출 ─────────────────────────────────┐ │
│ │ HTTP Client → 모델 서버 │ │
│ └──────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
│
│ HTTP/gRPC
▼
┌─────────────────────────────────────────────────────────┐
│ 모델 서버 (FastAPI) │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─ 내부 API ───────────────────────────────────────┐ │
│ │ /internal/llm/invoke │ │
│ │ /internal/embedding/embed │ │
│ │ /internal/ocr/extract │ │
│ │ /internal/yolo/detect │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ ┌─ AI 모델 호출 ───────────────────────────────────┐ │
│ │ - LLM (Gemini, OpenAI) │ │
│ │ - Embedding (Gemini API) │ │
│ │ - CLOVA OCR, YOLO │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ ┌─ 리소스 ─────────────────────────────────────────┐ │
│ │ - GPU (YOLO) │ │
│ │ - 고메모리 (모델 로딩) │ │
│ └──────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
| 장점 | 단점 |
|---|---|
| 리소스 격리 (GPU vs CPU) | 관리 복잡도 증가 |
| 독립적 스케일링 | 네트워크 레이턴시 |
| 장애 격리 | 배포 복잡 |
| 모델 서버 재사용 가능 |
Option 3: 마이크로서비스 (기능별 분리)
[서비스 서버] → [LLM 서버]
→ [Embedding 서버]
→ [OCR 서버]
→ [YOLO 서버]
| 장점 | 단점 |
|---|---|
| 최대 격리 | 오버엔지니어링 |
| 기능별 최적화 | 관리 복잡도 매우 높음 |
| 네트워크 오버헤드 |
✅ 결정 (Decision)
Option 2: 서버 분리 (모델 서버 + 서비스 서버) 채택
| 버전 | 구성 | 이유 |
|---|---|---|
| V1 (MVP) | 단일 서버 | 빠른 출시, 단순 구조 |
| V2 | 단일 서버 | 트래픽 패턴 파악 |
| V3 | 서버 분리 | 리소스 효율, 스케일링 |
| V4 | 서버 분리 유지 | 안정화 |
📝 근거 (Rationale)
1. V3부터 서버 분리 이유
- ✅ 리소스 격리: GPU는 모델 서버에만 할당 (비용 절감)
- ✅ 독립적 스케일링:
- 서비스 서버: CPU 기반, 수평 확장 용이
- 모델 서버: GPU 기반, 비용 효율적 확장
- ✅ 장애 격리: 모델 서버 장애 시 서비스 서버는 캐시/Fallback 사용
- ✅ 개발 효율: 모델 서버는 AI팀, 서비스 서버는 백엔드팀 독립 개발
2. V1~V2는 단일 서버 유지 이유
- ✅ 빠른 출시: 서버 분리는 추가 개발 시간 필요
- ✅ 트래픽 파악: 실제 트래픽 패턴 확인 후 최적화
- ✅ 비용 절감: 초기 트래픽 낮을 때 단일 서버로 충분
3. 마이크로서비스 (Option 3) 제외 이유
- ❌ 오버엔지니어링: MVP 단계에서 과도
- ❌ 관리 복잡도: 4~5개 서버 관리 부담
- ❌ 네트워크 오버헤드: 서버 간 통신 증가
🔧 구현 예시
서비스 서버 (FastAPI)
# service_server/main.py
from fastapi import FastAPI, Depends
from httpx import AsyncClient
app = FastAPI()
# 모델 서버 클라이언트
MODEL_SERVER_URL = "http://model-server:8001"
@app.post("/ai/chat")
async def chat(request: ChatRequest, user=Depends(get_current_user)):
"""채팅 API (서비스 서버)"""
# 1. 인증/인가
if not user.is_authenticated:
raise HTTPException(401)
# 2. 데이터 검증
if len(request.message) > 1000:
raise HTTPException(400, "메시지가 너무 깁니다")
# 3. DB 조회 (이력서 조회)
resume = await db.get_resume(user.id)
# 4. 모델 서버 호출
async with AsyncClient() as client:
response = await client.post(
f"{MODEL_SERVER_URL}/internal/llm/invoke",
json={
"prompt": request.message,
"context": resume,
"model": "gemini-3-flash"
}
)
llm_result = response.json()
# 5. 결과 가공 및 DB 저장
chat_history = await db.save_chat(user.id, request.message, llm_result)
return {"answer": llm_result["text"], "history_id": chat_history.id}
모델 서버 (FastAPI)
# model_server/main.py
from fastapi import FastAPI
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_openai import ChatOpenAI
app = FastAPI()
# LLM 초기화 (서버 시작 시 한 번만)
gemini_flash = ChatGoogleGenerativeAI(model="gemini-3-flash")
gpt5_mini = ChatOpenAI(model="gpt-5-mini")
chat_llm = gemini_flash.with_fallbacks([gpt5_mini])
@app.post("/internal/llm/invoke")
async def invoke_llm(request: LLMRequest):
"""LLM 호출 (내부 API)"""
# 인증 체크 (내부 API이므로 간단한 토큰)
if request.api_key != INTERNAL_API_KEY:
raise HTTPException(403)
# LLM 호출
result = await chat_llm.ainvoke(
f"Context: {request.context}\n\nQuestion: {request.prompt}"
)
return {
"text": result.content,
"model": result.response_metadata.get("model_name"),
"tokens": result.response_metadata.get("token_usage")
}
@app.post("/internal/embedding/embed")
async def embed_text(request: EmbeddingRequest):
"""Embedding 생성 (내부 API)"""
# Gemini Embedding 호출
vector = await gemini_embedding.aembed_query(request.text)
return {"vector": vector, "dimension": len(vector)}
@app.post("/internal/yolo/detect")
async def detect_objects(request: YOLORequest):
"""YOLO 객체 검출 (내부 API)"""
from ultralytics import YOLO
model = YOLO('yolo11n.pt')
results = model(request.image)
return {
"boxes": results[0].boxes.data.tolist(),
"count": len(results[0].boxes)
}
📊 리소스 할당
서비스 서버
| 리소스 | 사양 | 이유 |
|---|---|---|
| CPU | 4 vCPU | 비즈니스 로직 처리 |
| 메모리 | 8GB | DB 연결, 캐시 |
| GPU | ❌ 불필요 | |
| 스케일링 | 수평 확장 (2~10대) | 트래픽에 따라 |
모델 서버
| 리소스 | 사양 | 이유 |
|---|---|---|
| CPU | 8 vCPU | 모델 추론 |
| 메모리 | 32GB | 모델 로딩 (YOLO) |
| GPU | ✅ T4/V100 | YOLO 가속 |
| 스케일링 | 수직 확장 (1~2대) | GPU 비용 고려 |
🔐 보안
내부 API 인증
# 모델 서버 내부 API는 서비스 서버만 호출 가능
INTERNAL_API_KEY = os.getenv("INTERNAL_API_KEY")
@app.post("/internal/llm/invoke")
async def invoke_llm(request: LLMRequest, api_key: str = Header(...)):
if api_key != INTERNAL_API_KEY:
raise HTTPException(403, "Forbidden")
# ...
네트워크 격리
# docker-compose.yml
services:
service-server:
networks:
- public
- internal
model-server:
networks:
- internal # 외부 접근 불가
📊 비교표
| 항목 | 단일 서버 | 서버 분리 |
|---|---|---|
| 구조 복잡도 | ⭐ | ⭐⭐⭐ |
| 리소스 효율 | ⚠️ | ✅ |
| 스케일링 | ⚠️ 전체 확장 | ✅ 독립 확장 |
| 장애 격리 | ❌ | ✅ |
| 개발 속도 | ✅ 빠름 | ⚠️ 느림 |
| 운영 비용 | ⚠️ 높음 (GPU 항상 ON) | ✅ 낮음 (GPU 최적화) |
| V1~V2 적합성 | ✅ 추천 | ❌ 과함 |
| V3+ 적합성 | ❌ | ✅ 추천 |
🎯 최종 추천
V1~V2 (MVP)
단일 FastAPI 서버
- 빠른 출시
- 단순 구조
- 트래픽 패턴 파악
V3 (프로덕션)
서버 분리
- 서비스 서버: CPU 기반 (2~4대)
- 모델 서버: GPU 기반 (1~2대)
- 독립적 스케일링
V4 (최적화)
서버 분리 유지
- 모델 서버 캐싱 강화
- 서비스 서버 로드 밸런싱
- 모니터링 강화
📅 이력
| 날짜 | 변경 내용 |
|---|---|
| 2026-01-12 | 초기 결정 및 문서 작성 (V3부터 서버 분리) |
ADR-018: (예정)
📋 메타데이터
| 항목 | 내용 |
|---|---|
| 상태 | ✅ 승인됨 (Accepted) |
| 작성일 | 2026-01-12 |
| 결정자 | AI/백엔드팀 |
| 관련 기능 | 이벤트 스트리밍, 데이터 파이프라인, 실시간 처리 |
🎯 컨텍스트 (Context)
사용자 행동 데이터, 로그, 이벤트를 처리하고 분석해야 합니다.
처리해야 할 데이터:
- 사용자 행동 로그: 채팅, 분석 요청, 면접 진행
- 시스템 이벤트: 파일 업로드, OCR 완료, 임베딩 완료
- 배치 처리: 전체 사용자 패턴 분석 (V4)
문제점:
- Kafka와 Airflow의 용도가 다름을 이해해야 함
- 실시간 처리 vs 배치 처리 구분 필요
- 오버엔지니어링 방지 필요
🔍 선택지 분석 (Options)
Kafka vs Airflow 비교
┌─────────────────────────────────────────────────────────────────┐
│ Kafka vs Airflow 용도 차이 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ [Kafka - 실시간 이벤트 스트리밍] │
│ 사용자 행동 → Kafka → 실시간 처리 → 분석/저장 │
│ - 채팅 메시지, 클릭, 업로드 등 │
│ - 밀리초 단위 처리 │
│ - 이벤트 기반 아키텍처 │
│ │
│ [Airflow - 배치 처리 스케줄링] │
│ Cron → Airflow → 배치 작업 → 결과 저장 │
│ - 매일 새벽 패턴 분석 │
│ - 시간 단위 처리 │
│ - 스케줄 기반 아키텍처 │
│ │
└─────────────────────────────────────────────────────────────────┘
| 항목 | Kafka | Airflow |
|---|---|---|
| 용도 | 실시간 이벤트 스트리밍 | 배치 처리 스케줄링 |
| 처리 방식 | 이벤트 기반 (Event-driven) | 시간 기반 (Time-driven) |
| 레이턴시 | 밀리초 | 분/시간 |
| 데이터 흐름 | Producer → Kafka → Consumer | DAG 정의 → 스케줄 실행 |
| 사용 예 | 사용자 행동 로그, 실시간 알림 | 정기 보고서, 데이터 ETL |
| 학습 곡선 | 🔴🔴 높음 | 🔴🔴 높음 |
Option 1: Kafka (실시간 이벤트 스트리밍)
┌─────────────────────────────────────────────────────────────────┐
│ Kafka 아키텍처 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ [Producer] │
│ │ │
│ ├─ 사용자 채팅 → Kafka Topic: chat_events │
│ ├─ 파일 업로드 → Kafka Topic: file_events │
│ └─ 분석 요청 → Kafka Topic: analysis_events │
│ │
│ [Kafka Broker] │
│ ├─ Topic: chat_events (Partition 3) │
│ ├─ Topic: file_events (Partition 3) │
│ └─ Topic: analysis_events (Partition 3) │
│ │
│ [Consumer] │
│ ├─ 로그 저장 Consumer → MongoDB │
│ ├─ 실시간 분석 Consumer → Redis │
│ └─ 알림 Consumer → FCM/Email │
│ │
└─────────────────────────────────────────────────────────────────┘
사용 예시:
# Producer (FastAPI)
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
@app.post("/ai/chat")
async def chat(request: ChatRequest):
# 채팅 처리
result = await process_chat(request)
# Kafka에 이벤트 발행
producer.send('chat_events', {
'user_id': request.user_id,
'message': request.message,
'timestamp': datetime.now().isoformat()
})
return result
# Consumer (별도 서비스)
from kafka import KafkaConsumer
consumer = KafkaConsumer('chat_events', bootstrap_servers='localhost:9092')
for message in consumer:
# 실시간 로그 저장
await save_to_mongodb(message.value)
# 실시간 분석
await analyze_user_behavior(message.value)
| 장점 | 단점 |
|---|---|
| 실시간 처리 | 인프라 복잡도 매우 높음 |
| 확장성 우수 | 관리 부담 큼 |
| 이벤트 재처리 가능 | 학습 곡선 높음 |
| 마이크로서비스 연동 | 초기 비용 높음 |
Option 2: Airflow (배치 처리)
┌─────────────────────────────────────────────────────────────────┐
│ Airflow DAG │
├─────────────────────────────────────────────────────────────────┤
│ │
│ [매일 새벽 3시 실행] │
│ │ │
│ ├─ Task 1: DB에서 사용자 데이터 조회 │
│ │ │
│ ├─ Task 2: 패턴 분석 (Gemini Embedding) │
│ │ │
│ ├─ Task 3: 추천 생성 (LLM) │
│ │ │
│ └─ Task 4: VectorDB에 저장 │
│ │
└─────────────────────────────────────────────────────────────────┘
사용 예시:
from airflow import DAG
from airflow.operators.python import PythonOperator
dag = DAG(
'user_pattern_analysis',
schedule_interval='0 3 * * *', # 매일 새벽 3시
start_date=datetime(2026, 1, 1)
)
def analyze_patterns():
# 배치 처리
users = db.get_all_users()
patterns = analyze(users)
save_to_vectordb(patterns)
task = PythonOperator(
task_id='analyze',
python_callable=analyze_patterns,
dag=dag
)
| 장점 | 단점 |
|---|---|
| 배치 처리에 최적화 | 실시간 처리 불가 |
| DAG 시각화 | 이벤트 기반 처리 어려움 |
| 재시도/모니터링 강력 |
Option 3: CDC (Change Data Capture)
┌─────────────────────────────────────────────────────────────────┐
│ CDC (Debezium + Kafka) │
├─────────────────────────────────────────────────────────────────┤
│ │
│ [PostgreSQL] │
│ │ │
│ ├─ INSERT/UPDATE/DELETE │
│ │ │
│ ▼ │
│ [Debezium] ← DB 변경사항 캡처 │
│ │ │
│ ▼ │
│ [Kafka] ← 변경 이벤트 스트리밍 │
│ │ │
│ ▼ │
│ [Consumer] ← 실시간 처리 │
│ │
└─────────────────────────────────────────────────────────────────┘
| 장점 | 단점 |
|---|---|
| DB 변경사항 자동 캡처 | 매우 복잡한 설정 |
| 실시간 동기화 | Kafka 필수 |
| 데이터 일관성 | 오버엔지니어링 위험 |
✅ 결정 (Decision)
단계적 도입: 간단한 방법 → 필요 시 Kafka
| 버전 | 선택 | 이유 |
|---|---|---|
| V1~V2 | ❌ Kafka/Airflow 없음 | MVP 집중, 단순 로깅 |
| V3 | Celery Beat (배치) | 패턴 분석 배치 처리 |
| V4 | Celery Beat 유지 | 충분히 동작 |
| V4+ | Kafka 검토 (선택) | 실시간 처리 필요 시 |
📝 근거 (Rationale)
1. Kafka vs Airflow는 용도가 다름
Kafka (실시간 이벤트 스트리밍):
- ✅ 사용자 행동 로그 (채팅, 클릭, 업로드)
- ✅ 실시간 알림 (면접 시작, 분석 완료)
- ✅ 마이크로서비스 간 이벤트 전달
Airflow (배치 처리 스케줄링):
- ✅ 정기적 데이터 처리 (매일 새벽 패턴 분석)
- ✅ ETL 파이프라인
- ✅ 복잡한 워크플로우 관리
둘은 경쟁 관계가 아니라 보완 관계!
2. 프로젝트에서 Kafka가 필요한 경우
다음 조건 중 2개 이상 충족 시 Kafka 도입:
- ⚠️ 실시간 알림 필요 (면접 시작, 분석 완료)
- ⚠️ 이벤트 기반 아키텍처 필요 (마이크로서비스)
- ⚠️ 대량 로그 처리 (초당 1000+ 이벤트)
- ⚠️ 이벤트 재처리 필요 (장애 복구)
현재 프로젝트:
- ❌ 실시간 알림 불필요 (폴링으로 충분)
- ❌ 마이크로서비스 아님 (모놀리식)
- ❌ 대량 로그 없음 (초기 트래픽 낮음)
→ Kafka 불필요!
3. V3~V4는 Celery Beat로 충분
Celery Beat의 역할:
매일 새벽 3시:
1. DB에서 전체 사용자 데이터 조회
2. Gemini Embedding으로 패턴 분석
3. LLM으로 추천 생성
4. VectorDB에 저장
Kafka vs Celery Beat:
| 항목 | Kafka | Celery Beat |
|---|---|---|
| 실시간 처리 | ✅ | ❌ |
| 배치 처리 | ⚠️ 가능 | ✅ 최적 |
| 복잡도 | 🔴🔴🔴 | 🔴 |
| 비용 | 높음 | 낮음 |
| V3 적합성 | ❌ 과함 | ✅ 추천 |
4. Kafka 도입 시점 (V4+ 검토)
Kafka가 필요한 시나리오:
┌─────────────────────────────────────────────────────────────────┐
│ Kafka 도입 시나리오 (V4+) │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. 실시간 알림 시스템 │
│ - 면접 시작 알림 (FCM, Email) │
│ - 분석 완료 알림 │
│ │
│ 2. 실시간 대시보드 │
│ - 현재 접속자 수 │
│ - 실시간 채팅 통계 │
│ │
│ 3. 이벤트 기반 워크플로우 │
│ - 파일 업로드 → OCR → 임베딩 (이벤트 체인) │
│ - 분석 완료 → 알림 → 리포트 생성 │
│ │
│ 4. 마이크로서비스 전환 │
│ - 서비스 간 이벤트 전달 │
│ - 느슨한 결합 (Loose Coupling) │
│ │
└─────────────────────────────────────────────────────────────────┘
하지만 V3~V4는 불필요:
- ✅ 폴링으로 충분 (GET /ai/task/{id})
- ✅ Celery Beat로 배치 처리
- ✅ 단순 구조 유지
📊 비교표
| 항목 | Kafka | Airflow | Celery Beat |
|---|---|---|---|
| 용도 | 실시간 이벤트 | 배치 스케줄링 | 배치 스케줄링 |
| 레이턴시 | 밀리초 | 분/시간 | 분/시간 |
| 복잡도 | 🔴🔴🔴 | 🔴🔴 | 🔴 |
| 비용 | 높음 | 중간 | 낮음 |
| V3 적합성 | ❌ 과함 | ⚠️ 가능 | ✅ 추천 |
| V4+ 적합성 | ✅ 검토 | ✅ 검토 | ✅ 유지 |
🎯 최종 추천
V1~V2 (MVP)
로깅: 단순 파일/DB 저장
배치: ❌ 없음
V3 (배치 처리 도입)
배치: Celery Beat
- 매일 새벽 패턴 분석
- 간단한 구조
V4 (배치 처리 유지)
배치: Celery Beat 유지
- 전체 사용자 데이터 분석
- 질문 풀 생성
V4+ (실시간 처리 필요 시)
옵션 1: Celery Beat 유지 (추천)
옵션 2: Airflow 전환 (복잡한 워크플로우)
옵션 3: Kafka 도입 (실시간 이벤트)
💡 핵심 정리
질문: "사용자 정보 같은 건 Kafka가 더 좋은지 Airflow로 하는 게 더 좋아?"
답변:
- ❌ 잘못된 질문: Kafka와 Airflow는 용도가 다름
- ✅ 올바른 질문: "실시간 처리가 필요한가? 배치 처리가 필요한가?"
사용자 정보 처리:
- 실시간 필요 (로그인, 행동 추적) → Kafka
- 배치 처리 (매일 패턴 분석) → Airflow 또는 Celery Beat
프로젝트 결정:
- ✅ V3~V4: Celery Beat (배치 처리로 충분)
- ⚠️ V4+: 실시간 필요 시 Kafka 검토
� Redis + Kafka + Airflow 상호 보완 관계
질문: "Redis + Kafka + Airflow 모두 같이 사용하면 서로 상호 보완인가?"
답변: 네, 맞습니다! 경쟁 관계가 아니라 상호 보완 관계입니다!
각각의 역할
┌─────────────────────────────────────────────────────────────────┐
│ Redis + Kafka + Airflow 통합 아키텍처 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ [Redis - 캐시/세션] │
│ - 채팅 히스토리 (최근 10개) │
│ - 세션 관리 │
│ - 빠른 읽기/쓰기 (< 1ms) │
│ │
│ [Kafka - 실시간 이벤트] │
│ - 사용자 행동 로그 (채팅, 클릭) │
│ - 실시간 알림 │
│ - 이벤트 스트리밍 (밀리초) │
│ │
│ [Airflow - 배치 처리] │
│ - 매일 새벽 패턴 분석 │
│ - 정기 보고서 생성 │
│ - 스케줄 기반 (시간) │
│ │
└─────────────────────────────────────────────────────────────────┘
LLM 채팅에서의 통합 예시
┌─────────────────────────────────────────────────────────────────┐
│ LLM 채팅 - Redis + Kafka + Airflow 통합 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ [1. 사용자 메시지 도착] │
│ │ │
│ ├─→ Redis: 최근 대화 히스토리 조회 (< 1ms) │
│ │ "최근 10개 메시지 가져오기" │
│ │ │
│ ├─→ Kafka: 이벤트 발행 (실시간) │
│ │ Topic: chat_events │
│ │ {user_id, message, timestamp} │
│ │ │
│ └─→ LLM 호출 (히스토리 포함) │
│ │
│ [2. LLM 응답 생성] │
│ │ │
│ ├─→ Redis: 응답 저장 (최근 히스토리 업데이트) │
│ │ chat:room_001 → [msg1, msg2, ..., new_msg] │
│ │ │
│ └─→ Kafka: 응답 이벤트 발행 │
│ Topic: chat_responses │
│ │
│ [3. Kafka Consumer (실시간 처리)] │
│ │ │
│ ├─→ MongoDB: 영구 저장 (비동기) │
│ │ │
│ ├─→ 실시간 분석: 사용자 행동 패턴 │
│ │ │
│ └─→ 알림: 특정 키워드 감지 시 알림 │
│ │
│ [4. Airflow (배치 처리 - 매일 새벽 3시)] │
│ │ │
│ ├─→ MongoDB에서 전체 채팅 데이터 조회 │
│ │ │
│ ├─→ 패턴 분석: 자주 묻는 질문, 인기 주제 │
│ │ │
│ ├─→ LLM으로 인사이트 생성 │
│ │ │
│ └─→ VectorDB에 저장 (다음 날 추천에 활용) │
│ │
└─────────────────────────────────────────────────────────────────┘
상호 보완 관계 정리
| 도구 | 역할 | LLM 채팅에서 | 시점 |
|---|---|---|---|
| Redis | 캐시/세션 | 최근 대화 히스토리 (빠름) | 실시간 (< 1ms) |
| Kafka | 실시간 이벤트 | 채팅 로그 수집, 실시간 분석 | 실시간 (밀리초) |
| Airflow | 배치 처리 | 전체 패턴 분석, 추천 생성 | 배치 (매일) |
데이터 흐름
사용자 메시지
↓
Redis (히스토리 조회) → LLM → Redis (응답 저장)
↓
Kafka (이벤트 발행) → Consumer (MongoDB 저장)
↓
Airflow (매일 새벽) → 패턴 분석 → VectorDB
프로젝트 적용
V3~V4 (현재):
✅ Redis: 채팅 히스토리, 세션 관리
✅ Celery Beat: 배치 처리 (Airflow 대신)
❌ Kafka: 불필요 (폴링으로 충분)
V4+ (실시간 필요 시):
✅ Redis: 채팅 히스토리, 세션 관리
✅ Kafka: 실시간 이벤트 스트리밍
✅ Airflow: 복잡한 배치 처리
결론:
- ✅ 경쟁 관계 아님, 상호 보완!
- ✅ Redis: 빠른 캐시
- ✅ Kafka: 실시간 이벤트
- ✅ Airflow: 배치 분석
- ✅ 하지만 프로젝트는 V3~V4에서 Redis + Celery Beat만 사용!
�📅 이력
| 날짜 | 변경 내용 |
|---|---|
| 2026-01-12 | 초기 결정 및 문서 작성 (Celery Beat 우선, Kafka는 V4+ 검토) |
ADR-019: LLM 출력 구조화 - Pydantic AI 미사용
📋 메타데이터
| 항목 | 내용 |
|---|---|
| 상태 | ✅ 승인됨 (Accepted) |
| 작성일 | 2026-01-12 |
| 결정자 | AI팀 |
| 관련 기능 | LLM 출력 구조화, 비용 최적화 |
🎯 컨텍스트 (Context)
LLM 출력을 구조화하기 위한 방법을 결정해야 합니다.
선택지:
- Pydantic AI: LLM 오케스트레이션 프레임워크 (LangChain 대체재)
- LangChain + LangGraph: 기존 사용 중인 프레임워크
🔍 선택지 분석 (Options)
Option 1: Pydantic AI 도입
| 장점 | 단점 |
|---|---|
| 자동 JSON 변환 | 추가 라이브러리 비용 |
| 간결한 코드 | LangChain과 중복 |
| 학습 곡선 |
Option 2: LangChain + LangGraph 유지 ⭐
| 장점 | 단점 |
|---|---|
| 비용 절감 (추가 라이브러리 없음) | 수동 파싱 필요 |
| 기존 스택 활용 | |
| LangGraph 통합 용이 |
✅ 결정 (Decision)
Option 2: LangChain + LangGraph만 사용
이유:
- ✅ 비용 최적화: Pydantic AI는 추가 라이브러리 비용 발생
- ✅ 기술 스택 단순화: LangChain으로 충분히 구현 가능
- ✅ LangGraph 통합: 이미 LangGraph 사용 중
📝 근거 (Rationale)
Pydantic AI vs LangChain:
- Pydantic AI: LangChain의 대체재 (중복 투자)
- LangChain: 이미 사용 중이며 충분한 기능 제공
결론:
- Pydantic AI는 사용하지 않음
- LangChain PydanticOutputParser로 LLM 출력 구조화
- LangGraph로 상태 관리
📅 이력
| 날짜 | 변경 내용 |
|---|---|
| 2026-01-12 | 초기 결정 (Pydantic AI 미사용, LangChain 유지) |
문제: 파싱 필요, 타입 검증 없음
result = json.loads(response.content)
| 장점 | 단점 |
|------|------|
| 간단한 구조 | JSON 파싱 수동 |
| LangChain 네이티브 | 타입 검증 없음 |
| | 에러 처리 복잡 |
---
#### Option 2: Pydantic AI ⭐
```python
from pydantic_ai import Agent
from pydantic import BaseModel
# Pydantic 모델 정의
class InterviewQuestion(BaseModel):
question: str
difficulty: str # "easy" | "medium" | "hard"
category: str # "기술" | "인성" | "경험"
# Pydantic AI Agent
agent = Agent(
'gemini-3-flash',
result_type=InterviewQuestion # 자동 JSON 변환!
)
# 사용
result = agent.run_sync('면접 질문 생성해줘')
print(result.data.question) # 타입 안전!
print(result.data.difficulty) # 자동 검증!
| 장점 | 단점 |
|---|---|
| 자동 JSON 변환 | 추가 라이브러리 |
| 타입 안정성 | LangChain과 별도 |
| Pydantic 검증 | |
| 간결한 코드 |
Option 3: Pydantic AI + LangGraph ⭐⭐ 추천
from pydantic_ai import Agent
from langgraph.graph import StateGraph
from pydantic import BaseModel
from typing import TypedDict
# Pydantic 모델
class InterviewQuestion(BaseModel):
question: str
difficulty: str
category: str
# LangGraph 상태
class InterviewState(TypedDict):
resume: str
questions: list[InterviewQuestion]
current_question: int
# Pydantic AI Agent
agent = Agent('gemini-3-flash', result_type=InterviewQuestion)
# LangGraph 노드
def generate_question(state: InterviewState):
result = agent.run_sync(f"Resume: {state['resume']}")
return {
"questions": state["questions"] + [result.data]
}
# LangGraph 정의
workflow = StateGraph(InterviewState)
workflow.add_node("generate", generate_question)
workflow.set_entry_point("generate")
# 실행
app = workflow.compile()
result = app.invoke({
"resume": "...",
"questions": [],
"current_question": 0
})
| 장점 | 단점 |
|---|---|
| JSON 자동 변환 | 두 라이브러리 사용 |
| 타입 안정성 | 학습 곡선 |
| 상태 관리 (LangGraph) | |
| 복잡한 워크플로우 |
✅ 결정 (Decision)
Option 3: Pydantic AI + LangGraph 조합 채택
| 버전 | 사용 | 이유 |
|---|---|---|
| V1~V2 | LangChain만 | 간단한 구조 |
| V3 | Pydantic AI | JSON 출력 필요 |
| V4 | Pydantic AI + LangGraph | 면접 모드 (상태 관리) |
📝 근거 (Rationale)
1. Pydantic AI의 장점
자동 JSON 변환:
# Before (LangChain)
response = llm.invoke(prompt)
result = json.loads(response.content) # 수동 파싱
if "question" not in result: # 수동 검증
raise ValueError("Invalid format")
# After (Pydantic AI)
result = agent.run_sync(prompt)
print(result.data.question) # 자동 파싱 + 검증!
타입 안정성:
class AnalysisResult(BaseModel):
resume_summary: str
job_summary: str
insights: list[str]
match_score: float # 0.0 ~ 1.0
agent = Agent('gemini-3-flash', result_type=AnalysisResult)
result = agent.run_sync("분석해줘")
# 타입 체크 자동!
print(result.data.match_score) # float 보장
2. LangGraph와의 시너지
복잡한 워크플로우:
from pydantic_ai import Agent
from langgraph.graph import StateGraph
from pydantic import BaseModel
# 1. Pydantic 모델 정의
class Question(BaseModel):
question: str
difficulty: str
class Answer(BaseModel):
answer: str
evaluation: str
# 2. Pydantic AI Agent
question_agent = Agent('gemini-3-flash', result_type=Question)
eval_agent = Agent('gemini-3-flash', result_type=Answer)
# 3. LangGraph 워크플로우
class InterviewState(TypedDict):
resume: str
questions: list[Question]
answers: list[Answer]
def generate_question(state):
result = question_agent.run_sync(f"Resume: {state['resume']}")
return {"questions": state["questions"] + [result.data]}
def evaluate_answer(state):
result = eval_agent.run_sync(f"Answer: {state['answers'][-1]}")
return {"answers": state["answers"] + [result.data]}
workflow = StateGraph(InterviewState)
workflow.add_node("generate", generate_question)
workflow.add_node("evaluate", evaluate_answer)
workflow.add_edge("generate", "evaluate")
장점:
- ✅ JSON 자동 변환 (Pydantic AI)
- ✅ 상태 관리 (LangGraph)
- ✅ 타입 안정성 (Pydantic)
- ✅ 복잡한 워크플로우 (LangGraph)
3. 프로젝트 적용 예시
면접 모드 (V4):
from pydantic_ai import Agent
from langgraph.graph import StateGraph
from pydantic import BaseModel
# Pydantic 모델
class InterviewQuestion(BaseModel):
question: str
difficulty: str # "easy" | "medium" | "hard"
category: str # "기술" | "인성" | "경험"
follow_up: bool
class InterviewAnswer(BaseModel):
answer: str
evaluation: str # "good" | "average" | "poor"
feedback: str
# Pydantic AI Agent
question_agent = Agent('gemini-3-flash', result_type=InterviewQuestion)
eval_agent = Agent('gemini-3-flash', result_type=InterviewAnswer)
# LangGraph 상태
class InterviewState(TypedDict):
resume: str
job_posting: str
questions: list[InterviewQuestion]
answers: list[InterviewAnswer]
current_index: int
# LangGraph 노드
def generate_question(state: InterviewState):
"""질문 생성"""
result = question_agent.run_sync(
f"Resume: {state['resume']}\nJob: {state['job_posting']}"
)
return {
"questions": state["questions"] + [result.data],
"current_index": state["current_index"] + 1
}
def evaluate_answer(state: InterviewState):
"""답변 평가"""
last_answer = state["answers"][-1] if state["answers"] else None
result = eval_agent.run_sync(f"Answer: {last_answer}")
return {
"answers": state["answers"] + [result.data]
}
def should_continue(state: InterviewState):
"""계속 질문할지 결정"""
if state["current_index"] >= 5:
return "end"
last_question = state["questions"][-1]
if last_question.follow_up:
return "generate"
return "end"
# LangGraph 정의
workflow = StateGraph(InterviewState)
workflow.add_node("generate", generate_question)
workflow.add_node("evaluate", evaluate_answer)
workflow.add_conditional_edges(
"evaluate",
should_continue,
{
"generate": "generate",
"end": END
}
)
workflow.set_entry_point("generate")
# 실행
app = workflow.compile()
result = app.invoke({
"resume": "...",
"job_posting": "...",
"questions": [],
"answers": [],
"current_index": 0
})
# 타입 안전한 결과
for q in result["questions"]:
print(f"Q: {q.question} ({q.difficulty})")
for a in result["answers"]:
print(f"A: {a.answer} - {a.evaluation}")
📊 비교표
| 항목 | LangChain만 | Pydantic AI | Pydantic AI + LangGraph |
|---|---|---|---|
| JSON 변환 | ❌ 수동 | ✅ 자동 | ✅ 자동 |
| 타입 검증 | ❌ 없음 | ✅ Pydantic | ✅ Pydantic |
| 상태 관리 | ⚠️ 수동 | ⚠️ 수동 | ✅ LangGraph |
| 복잡한 워크플로우 | ❌ 어려움 | ⚠️ 가능 | ✅ 쉬움 |
| 학습 곡선 | 🔴 | 🔴🔴 | 🔴🔴🔴 |
| V3 적합성 | ⚠️ | ✅ 추천 | ❌ 과함 |
| V4 적합성 | ❌ | ⚠️ | ✅ 추천 |
🎯 최종 추천
V1~V2 (MVP)
# LangChain만 사용
llm = ChatGoogleGenerativeAI(model="gemini-3-flash")
response = llm.invoke(prompt)
V3 (JSON 출력 필요)
# Pydantic AI 도입
from pydantic_ai import Agent
agent = Agent('gemini-3-flash', result_type=AnalysisResult)
result = agent.run_sync(prompt)
print(result.data.resume_summary) # 타입 안전!
V4 (면접 모드 - 상태 관리)
# Pydantic AI + LangGraph
from pydantic_ai import Agent
from langgraph.graph import StateGraph
question_agent = Agent('gemini-3-flash', result_type=Question)
workflow = StateGraph(InterviewState)
# ... 복잡한 워크플로우
💡 핵심 정리
Pydantic AI의 장점:
- ✅ 모든 결과를 JSON으로 자동 변환
- ✅ Pydantic 모델로 타입 검증
- ✅ 간결한 코드
LangGraph와의 조합:
- ✅ Pydantic AI: JSON 출력 구조화
- ✅ LangGraph: 상태 관리, 복잡한 워크플로우
- ✅ 시너지: 타입 안전 + 상태 관리
프로젝트 적용:
- ✅ V3: Pydantic AI (JSON 출력)
- ✅ V4: Pydantic AI + LangGraph (면접 모드)
📅 이력
| 날짜 | 변경 내용 |
|---|---|
| 2026-01-12 | 초기 결정 및 문서 작성 (Pydantic AI + LangGraph 조합) |
ADR-020: (예정)
📋 메타데이터
| 항목 | 내용 |
|---|---|
| 상태 | ✅ 승인됨 (Accepted) |
| 작성일 | 2026-01-12 |
| 결정자 | AI팀 |
| 관련 기능 | RAG, VectorDB 검색, 채팅/면접 모드 |
🎯 컨텍스트 (Context)
RAG 시스템에서 VectorDB 검색 결과의 정확도를 높여야 합니다.
문제점:
- VectorDB의 벡터 유사도 ≠ 실제 관련성
- "이력서 작성"이 "면접 준비"보다 유사도 높을 수 있음
- 관련 없는 문서가 LLM에 전달됨
- RAG 성능 저하
요구사항:
- 검색 결과를 실제 관련성 기준으로 재정렬
- LLM에 정확한 컨텍스트 전달
- RAG 성능 향상 (정확도 20~30% 개선)
🔍 선택지 분석 (Options)
Option 1: Reranker 없이 (기본 RAG)
┌─────────────────────────────────────────────────────────────────┐
│ 기본 RAG (Reranker 없음) │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 사용자 질문: "면접 준비 어떻게 해?" │
│ ↓ │
│ Embedding 변환 │
│ ↓ │
│ VectorDB 검색 (코사인 유사도) │
│ ├─ 결과 1: 이력서 작성 팁 (0.85) ← 유사도 높음 │
│ ├─ 결과 2: 면접 준비 가이드 (0.82) ← 관련성 높음! │
│ └─ 결과 3: 자기소개서 예시 (0.80) ← 관련성 낮음 │
│ ↓ │
│ LLM에 전달 (상위 3개) │
│ → 결과 1, 2, 3 모두 전달 │
│ → 결과 1, 3은 질문과 관련 없음! ❌ │
│ │
└─────────────────────────────────────────────────────────────────┘
| 장점 | 단점 |
|---|---|
| 간단한 구조 | 벡터 유사도 ≠ 실제 관련성 |
| 추가 비용 없음 | 관련 없는 문서 전달 |
| RAG 성능 저하 |
Option 2: Cohere Rerank API ⭐
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CohereRerank
from langchain_community.vectorstores import Chroma
# VectorDB
vectordb = Chroma(...)
# Reranker
reranker = CohereRerank(
model="rerank-multilingual-v3.0",
top_n=3
)
# Retriever + Reranker
retriever = ContextualCompressionRetriever(
base_compressor=reranker,
base_retriever=vectordb.as_retriever(search_kwargs={"k": 10})
)
# 사용
docs = retriever.get_relevant_documents("면접 준비 어떻게 해?")
# → 상위 10개 검색 → Reranker로 재정렬 → 상위 3개 반환
흐름:
VectorDB 검색 (상위 10개)
↓
Cohere Rerank API (재정렬)
↓
상위 3개 반환
| 장점 | 단점 |
|---|---|
| 높은 정확도 | API 비용 발생 |
| 한국어 지원 우수 | 외부 API 의존 |
| LangChain 통합 쉬움 | 네트워크 레이턴시 |
| 관리 불필요 |
비용:
- 무료 티어: 1,000 requests/월
- Pay-as-you-go: $1 / 1,000 requests
Option 3: BGE Reranker (Self-hosted)
from sentence_transformers import CrossEncoder
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
# Reranker 모델 (Self-hosted)
model = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-v2-m3")
reranker = CrossEncoderReranker(model=model, top_n=3)
# Retriever + Reranker
retriever = ContextualCompressionRetriever(
base_compressor=reranker,
base_retriever=vectordb.as_retriever(search_kwargs={"k": 10})
)
흐름:
VectorDB 검색 (상위 10개)
↓
BGE Reranker (Self-hosted)
↓
상위 3개 반환
| 장점 | 단점 |
|---|---|
| 무료 (Self-hosted) | GPU 필요 (추론 시) |
| 데이터 프라이버시 | 인프라 관리 필요 |
| 한국어 지원 | 초기 설정 복잡 |
| 오픈소스 |
리소스:
- GPU: T4 이상 (추론 시)
- 메모리: 4GB+
Option 4: Hybrid (Cohere + Fallback)
from langchain.retrievers.document_compressors import CohereRerank
try:
# Cohere Rerank (1순위)
reranker = CohereRerank(model="rerank-multilingual-v3.0", top_n=3)
docs = retriever.get_relevant_documents(query)
except Exception:
# Fallback: Reranker 없이
docs = vectordb.similarity_search(query, k=3)
| 장점 | 단점 |
|---|---|
| 고가용성 | 복잡도 증가 |
| Fallback 보장 | 일관성 낮음 |
✅ 결정 (Decision)
단계적 도입: Cohere Rerank → BGE Reranker (선택)
| 버전 | 선택 | 이유 |
|---|---|---|
| V1~V2 | ❌ Reranker 없음 | MVP 집중 |
| V3 | Cohere Rerank | RAG 성능 향상 |
| V4 | Cohere Rerank 유지 | 안정적 운영 |
| V4+ | BGE Reranker (선택) | 비용 절감 필요 시 |
📝 근거 (Rationale)
1. Reranker의 필요성
문제: 벡터 유사도 ≠ 실제 관련성
질문: "면접 준비 어떻게 해?"
VectorDB 검색 결과 (코사인 유사도):
1. 이력서 작성 팁 (0.85) ← 유사도 높지만 관련성 낮음
2. 면접 준비 가이드 (0.82) ← 유사도 낮지만 관련성 높음!
3. 자기소개서 예시 (0.80)
Reranker 재정렬 (실제 관련성):
1. 면접 준비 가이드 (0.95) ✅
2. 면접 질문 예시 (0.88) ✅
3. 이력서 작성 팁 (0.65)
성능 향상:
- ✅ RAG 정확도 20~30% 개선
- ✅ LLM에 정확한 컨텍스트 전달
- ✅ 사용자 만족도 향상
2. V3: Cohere Rerank 선택 이유
장점:
- ✅ 높은 정확도: 한국어 지원 우수
- ✅ 간단한 통합: LangChain 네이티브 지원
- ✅ 관리 불필요: API만 호출
- ✅ 무료 티어: 1,000 requests/월 (V3 충분)
비용 분석:
V3 예상 트래픽:
- 채팅: 100 requests/일
- 면접: 50 requests/일
- 분석: 30 requests/일
= 180 requests/일 × 30일 = 5,400 requests/월
Cohere Rerank 비용:
- 무료 티어: 1,000 requests/월
- 초과분: 4,400 requests × $0.001 = $4.4/월
→ 매우 저렴! ✅
3. V4+: BGE Reranker 전환 검토
전환 시점:
- ⚠️ Cohere 비용이 $50/월 초과
- ⚠️ 데이터 프라이버시 요구사항 강화
- ⚠️ GPU 인프라 구축 완료
BGE Reranker 장점:
- ✅ 무료 (Self-hosted)
- ✅ 데이터 프라이버시
- ✅ 한국어 지원
하지만 V3는 Cohere로 충분!
� 구현 예시
V3: Cohere Rerank
from langchain_community.vectorstores import Chroma
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CohereRerank
from langchain.chains import RetrievalQA
# VectorDB
vectordb = Chroma(
collection_name="resumes",
embedding_function=embedding
)
# Reranker
reranker = CohereRerank(
model="rerank-multilingual-v3.0",
top_n=3,
cohere_api_key=os.getenv("COHERE_API_KEY")
)
# Retriever + Reranker
compression_retriever = ContextualCompressionRetriever(
base_compressor=reranker,
base_retriever=vectordb.as_retriever(search_kwargs={"k": 10})
)
# RAG Chain
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
retriever=compression_retriever
)
# 사용
result = qa_chain.invoke("면접 준비 어떻게 해?")
# → VectorDB 상위 10개 → Reranker 재정렬 → 상위 3개 → LLM
V4+: BGE Reranker (Self-hosted)
from sentence_transformers import CrossEncoder
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
# BGE Reranker (Self-hosted)
model = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-v2-m3")
reranker = CrossEncoderReranker(model=model, top_n=3)
# Retriever + Reranker
compression_retriever = ContextualCompressionRetriever(
base_compressor=reranker,
base_retriever=vectordb.as_retriever(search_kwargs={"k": 10})
)
# RAG Chain
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
retriever=compression_retriever
)
📊 비교표
| 항목 | Reranker 없음 | Cohere Rerank | BGE Reranker |
|---|---|---|---|
| 정확도 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| 한국어 | ⚠️ | ✅ 우수 | ✅ 좋음 |
| 비용 | 무료 | $4~50/월 | 무료 (GPU 필요) |
| 관리 | ✅ 없음 | ✅ 없음 | ❌ 필요 |
| 레이턴시 | 빠름 | 중간 (+100ms) | 중간 (+50ms) |
| V3 적합성 | ❌ | ✅ 추천 | ❌ 과함 |
| V4+ 적합성 | ❌ | ✅ | ✅ 검토 |
🎯 최종 추천
V1~V2 (MVP)
# Reranker 없이
docs = vectordb.similarity_search(query, k=3)
V3 (RAG 성능 향상)
# Cohere Rerank
from langchain.retrievers.document_compressors import CohereRerank
reranker = CohereRerank(model="rerank-multilingual-v3.0", top_n=3)
retriever = ContextualCompressionRetriever(
base_compressor=reranker,
base_retriever=vectordb.as_retriever(search_kwargs={"k": 10})
)
V4+ (비용 절감)
# BGE Reranker (Self-hosted)
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
model = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-v2-m3")
reranker = CrossEncoderReranker(model=model, top_n=3)
💡 핵심 정리
Reranker란?
- ✅ VectorDB 검색 결과를 재정렬하는 모델
- ✅ 벡터 유사도 → 실제 관련성 기반 재정렬
- ✅ RAG 성능 대폭 향상 (20~30%)
동작 방식:
1. VectorDB 검색 (상위 10개)
2. Reranker로 재정렬 (관련성 점수)
3. LLM에 전달 (상위 3개)
프로젝트 적용:
- ✅ V3: Cohere Rerank (간단, 정확)
- ⚠️ V4+: BGE Reranker (비용 절감 필요 시)
📅 이력
| 날짜 | 변경 내용 |
|---|---|
| 2026-01-12 | 초기 결정 및 문서 작성 (Cohere Rerank 우선, BGE는 V4+ 검토) |