데이터 파이프라인 설계 - prgrms-web-devcourse-final-project/WEB4_5_GAEPPADAK_BE GitHub Wiki
실시간 트렌드 키워드를 추출하여, 해당 키워드와 연관된 네이버 뉴스 및 YouTube 영상을 검색하고, LLM으로 요약된 Post를 생성하는 배치 파이프라인입니다.
실시간성을 요구하지만, MVP 단계에서는 신뢰성과 비용을 우선시해 시간 기반 배치가 적합하다고 판단하여 Spring Scheduler + Spring Batch 조합을 선택했습니다.
- 목표: 매시각 신규 트렌드 키워드를 추출하면 관련 콘텐츠(Post)가 생성되어 사용자에게 빠르게 제공
-
주요 기능:
- Google Trends 기반 키워드 수집 및 시간대별 점수 저장
- Naver News + YouTube API 병렬 호출로 소스 검색
- LLM 요약을 통한 Post 생성 및 매핑
- Redis 캐싱으로 빠른 응답 보장
-
배치 스케줄러: Spring Scheduler (
HourScheduler
) – Cron 표현식0 0 * * * *
-
배치 프레임워크: Spring Batch 5.2.2
- Spring Batch는 (RSS 수집 → 외부 API 호출 → LLM 요약) 각 Step을 독립 분리 가능
- 오류가 발생할 경우 장애를 처리 가능(Retry, Skip 등)
- ExecutionContext 저장소 공유 가능
-
데이터베이스: MySQL
-
키워드 테이블:
keyword
(고유 키워드) -
시간별 키워드 메트릭:
keyword_metric_hourly
(시간대별 점수·신규성 지표) -
소스 테이블:
source
(뉴스·영상 URL 정보) -
관계 테이블:
source_keyword
(source ↔ keyword N:N) -
포스트 테이블:
post
(LLM 요약 결과) -
포스트-소스 매핑:
post_source
(Post ↔ Source N:N) -
포스트-키워드 매핑:
post_keyword
(Post ↔ Keyword N:N)
-
키워드 테이블:
- 캐시: Redis (Post 용 TTL 24h)
- 외부 API: Google Trends RSS, Naver News API, YouTube Data API
- LLM 서비스: 외부 AI 요약 API
-
크론 표현식:
0 0 * * * *
(매시 정각) -
Trigger:
HourScheduler
→ Spring BatchJobLauncher
호출 →trendingKeywordsJob
실행
graph LR
subgraph 배치 스케줄러
A["Spring Scheduler (HourScheduler)"]
B(Cron: 0 0 * * * *)
A -- 실행 --> C{Spring Batch Job 실행}
end
subgraph 배치 프레임워크
C -- Step 1: RSS 수집 --> D[Google Trends RSS]
C -- Step 2: 외부 API 호출 --> E[Naver News API]
C -- Step 3: 외부 API 호출 --> F[YouTube Data API]
D -- 데이터 추출 --> G(데이터 가공)
E -- 데이터 추출 --> G
F -- 데이터 추출 --> G
G -- LLM 요약 요청 --> H[외부 AI 요약 API]
H -- 요약 결과 --> I{Post 테이블 저장}
I -- Post-Source 매핑 --> J[post_source]
I -- Post-Keyword 매핑 --> K[post_keyword]
G -- 키워드 추출 및 분석 --> L{keyword 테이블 업데이트/생성}
L -- 시간별 메트릭 저장 --> M[keyword_metric_hourly]
G -- Source 정보 저장 --> N[source 테이블 업데이트/생성]
N -- Source-Keyword 매핑 --> O[source_keyword]
C -- 오류 발생 --> P{"장애 처리 (Retry, Skip)"}
C -- JobExecutionContext 공유 --> Q(JobExecutionContext 저장소)
end
subgraph 데이터베이스
subgraph MySQL
I -- 저장 --> R[post]
J -- 매핑 --> S[post_source]
K -- 매핑 --> T[post_keyword]
L -- 저장/업데이트 --> U[keyword]
M -- 저장 --> V[keyword_metric_hourly]
N -- 저장/업데이트 --> W[source]
O -- 매핑 --> X[source_keyword]
end
end
subgraph 캐시
Y[Redis]
R -- 캐싱 (TTL 24h) --> Y
Y -- 조회 --> R
end
flowchart LR
S1[Step 1: Google RSS 수집]
S1 --> SPL{{외부 출처 수집}}
SPL --> N1[Step 2-1: Naver News 수집]
SPL --> V1[Step 2-2: YouTube 수집]
N1 --> S3[Step 3: 신규성 평가]
V1 --> S3
S3 --> DEC{{스킵 여부}}
DEC -- SKIP_POST --> S6[END]
DEC -- CONTINUE --> S4[Step 4: LLM 포스트 생성]
S4 --> S5[Step 5: 신규 포스트 Redis Cache]
S5 --> S6
단계 | Task 명칭 | 설명 | 주요 테이블 |
---|---|---|---|
1 | fetchTrendingKeywords | Google Trends RSS 호출(Top10) → DB에 Upsert → 시간대별 점수 계산 → ExecutionContext에 저장 | keyword, keyword_metric_hourly |
2 | searchSources (병렬) |
실시간 트렌드 키워드를 추출하여, 해당 키워드와 연관된 네이버 뉴스 및 YouTube 영상을 검색하고, LLM으로 요약된 Post를 생성하는 배치 파이프라인입니다.
실시간성을 요구하지만, MVP 단계에서는 신뢰성과 비용을 우선시해 시간 기반 배치가 적합하다고 판단하여 Spring Scheduler + Spring Batch 조합을 선택했습니다.
- 목표: 매시각 신규 트렌드 키워드를 추출하면 관련 콘텐츠(Post)가 생성되어 사용자에게 빠르게 제공
-
주요 기능:
- Google Trends 기반 키워드 수집 및 시간대별 점수 저장
- Naver News + YouTube API 병렬 호출로 소스 검색
- LLM 요약을 통한 Post 생성 및 매핑
- Redis 캐싱으로 빠른 응답 보장
-
배치 스케줄러: Spring Scheduler (
HourScheduler
) – Cron 표현식0 0 * * * *
-
배치 프레임워크: Spring Batch 5.2.2
- Spring Batch는 (RSS 수집 → 외부 API 호출 → LLM 요약) 각 Step을 독립 분리 가능
- 오류가 발생할 경우 장애를 처리 가능(Retry, Skip 등)
- ExecutionContext 저장소 공유 가능
-
데이터베이스: MySQL
-
키워드 테이블:
keyword
(고유 키워드) -
시간별 키워드 메트릭:
keyword_metric_hourly
(시간대별 점수·신규성 지표) -
소스 테이블:
source
(뉴스·영상 URL 정보) -
관계 테이블:
source_keyword
(source ↔ keyword N:N) -
포스트 테이블:
post
(LLM 요약 결과) -
포스트-소스 매핑:
post_source
(Post ↔ Source N:N) -
포스트-키워드 매핑:
post_keyword
(Post ↔ Keyword N:N)
-
키워드 테이블:
- 캐시: Redis (Post 용 TTL 24h)
- 외부 API: Google Trends RSS, Naver News API, YouTube Data API
- LLM 서비스: 외부 AI 요약 API
-
크론 표현식:
0 0 * * * *
(매시 정각) -
Trigger:
HourScheduler
→ Spring BatchJobLauncher
호출 →trendingKeywordsJob
실행
graph LR
subgraph 배치 스케줄러
A["Spring Scheduler (HourScheduler)"]
B(Cron: 0 0 * * * *)
A -- 실행 --> C{Spring Batch Job 실행}
end
subgraph 배치 프레임워크
C -- Step 1: RSS 수집 --> D[Google Trends RSS]
C -- Step 2: 외부 API 호출 --> E[Naver News API]
C -- Step 3: 외부 API 호출 --> F[YouTube Data API]
D -- 데이터 추출 --> G(데이터 가공)
E -- 데이터 추출 --> G
F -- 데이터 추출 --> G
G -- LLM 요약 요청 --> H[외부 AI 요약 API]
H -- 요약 결과 --> I{Post 테이블 저장}
I -- Post-Source 매핑 --> J[post_source]
I -- Post-Keyword 매핑 --> K[post_keyword]
G -- 키워드 추출 및 분석 --> L{keyword 테이블 업데이트/생성}
L -- 시간별 메트릭 저장 --> M[keyword_metric_hourly]
G -- Source 정보 저장 --> N[source 테이블 업데이트/생성]
N -- Source-Keyword 매핑 --> O[source_keyword]
C -- 오류 발생 --> P{"장애 처리 (Retry, Skip)"}
C -- JobExecutionContext 공유 --> Q(JobExecutionContext 저장소)
end
subgraph 데이터베이스
subgraph MySQL
I -- 저장 --> R[post]
J -- 매핑 --> S[post_source]
K -- 매핑 --> T[post_keyword]
L -- 저장/업데이트 --> U[keyword]
M -- 저장 --> V[keyword_metric_hourly]
N -- 저장/업데이트 --> W[source]
O -- 매핑 --> X[source_keyword]
end
end
subgraph 캐시
Y[Redis]
R -- 캐싱 (TTL 24h) --> Y
Y -- 조회 --> R
end
flowchart LR
S1[Step 1: Google RSS 수집]
S1 --> SPL{{외부 출처 수집}}
SPL --> N1[Step 2-1: Naver News 수집]
SPL --> V1[Step 2-2: YouTube 수집]
N1 --> S3[Step 3: 신규성 평가]
V1 --> S3
S3 --> DEC{{스킵 여부}}
DEC -- SKIP_POST --> S6[END]
DEC -- CONTINUE --> S4[Step 4: LLM 포스트 생성]
S4 --> S5[Step 5: 신규 포스트 Redis Cache]
S5 --> S6
단계 | Task 명칭 | 설명 | 주요 테이블 |
---|---|---|---|
1 | fetchTrendingKeywords | Google Trends RSS 호출(Top10) → DB에 Upsert → 시간대별 점수 계산 → ExecutionContext에 저장 | keyword, keyword_metric_hourly |
2 | searchSources (병렬) |
- searchNews
- searchVideos | Naver News API(10) + YouTube API(10) 병렬 호출 → URL 정규화 → SHA‑256 fingerprint → 중복 방어 | source, source_keyword | | 3 | evaluateNovelty | 수집된 Source 결과 기반 → Novelty 조건 평가 및 플래그 업데이트 | keyword_metric_hourly | | Decider | noveltyDecider | 모두 Low-Var인 경우 Post 생성 Skip | - | | 4 | generatePost | 신규 Source 기반 LLM 요약 호출 → DB 저장 | post, post_source, post_keyword | | 5 | cachePost | Redis 카드 캐싱 TTL 24 h | - |
- HourScheduler가 Cron에 따라 Job 시작
-
fetchTrendingKeywordsStep
Tasklet에서 Google Trends RSS 호출 - 수신된 JSON에서 키워드 목록 추출
- 제한: RSS 피드는 키워드 텍스트와 대략적 트래픽만 제공하며, 정확한 검색량을 포함하지 않습니다.
- 갱신 주기: 일반적으로 10분 단위로 갱신되는 것으로 보이나, 약 1시간 지연됨을 감안해야 합니다.
- 변화량, 자세한 검색량 등 세부 지표가 필요한 경우, pytrends 같은 비공식 라이브러리 활용을 고려해야 합니다.
- 없는 키워드는 keyword 테이블에 INSERT, 기존 키워드는 무시
- keyword_metric_hourly 테이블에 INSERT (시간대별 점수 저장)
- keyword_metric_hourly에서 INDEX(bucket_at, score)를 활용해 Top‑10 키워드 조회
- 조회된 키워드 리스트를 Execution Context에 저장
-
Job Execution Context
변수명 설명 타입 topKeywordIds 수집한 키워드의 ID 리스트 List topKeywordCount 수집한 키워드 개수 Integer
-
-
searchSourcesFlow
내searchNewsStep
과searchVideosStep
을 병렬 실행 - 각 Step에서 Naver News API(10개) + YouTube API(10개) 호출
- URL 정규화 (https 강제, 쿼리스트링 제거) 후 SHA‑256 fingerprint 생성
- source 테이블에 INSERT IGNORE로 중복 방어
- source_keyword 테이블에 INSERT
- 조회된 소스 관련 통계를 Execution Context에 저장
-
Step Execution Context (메트릭용)
변수명 설명 예시 타입 newsFetched 신규로 저장된 네이버 뉴스 개수 총합 Integer newsApiFailed 뉴스 API 호출 실패 건수 (최대: 키워드 개수) Integer videoFetched 신규로 저장된 유튜브 영상 개수 총합 Integer videoApiFailed 영상 API 호출 실패 건수 (최대: 키워드 개수) Integer
-
LLM 요약 리소스를 절감하고, 불필요한 포스트 생성을 억제 → 비용 절감 효과
- 소스 검색 Context(
*topKeywordIds*
) Load - 아래 조건을 고려하여 score 10 미만일 시, low_variation = true → Post 생성 제외
- 순위 변화량: 트렌드 정체 or 하락세 감지
- rank_delta = prev_rank - curr_rank (양수이면 인기 상승)
- rank_delta가 양수일 경우 score += rank_delta
- 가중치 기반 Novelty Ratio: 이전 메트릭과의 시간 간격
- weightedNovelty = noveltyRatio × 10
- 이후 weightedNovelty를 score에 추가
-
noPostStreak
: 포스트 생성 제외 확인- Post 생성에서 제외될 때마다 상승
- 이후 score에 추가 → 상승될 때마다 신규성이 상승
- 순위 변화량: 트렌드 정체 or 하락세 감지
- keyword_metric_hourly 테이블에 rank_delta, novelty_ratio, weighted_novelty, consecutive_hours, low_variation 속성 UPDATE
- 포스트 생성가능한 키워드 리스트를 Execution Context에 저장
-
Job Execution Context
변수명 설명 타입 postableKeywordIds 포스트 생성 가능한 키워드 리스트 List postableKeywordCount 포스트 생성 가능한 키워드 개수 Integer -
Step Execution Context (메트릭용)
변수명 설명 예시 타입 noveltyLowVarCount Low-Var 키워드 수(스킵된 건수) Integer
-
-
generatePostStep
Tasklet에서low_variation=false
인 키워드만 Post 생성 진행- 제외된 키워드의 경우(
low_variation=true
)- post 생성 스킵
- 기존 post를 해당 keyword_metric_hourly와 FK 연결
- 각 신규 URL에 대응하는 post_source 레코드 연결
- 제외된 키워드의 경우(
- 조회된 신규 Source URL로 AI LLM 요약 API 호출
- 반환된 title, summary로 post INSERT
- 생성된 post를 해당 keyword_metric_hourly와 FK 연결
- post_source, post_keyword, post_metric_hourly 테이블에 정보 INSERT
- 신규 포스트 리스트를 Execution Context에 저장
-
Job Execution Context
변수명 설명 타입 newPostIds 새로 생성된 포스트 리스트 List -
Step Execution Context (메트릭용)
변수명 설명 예시 타입 postCreated 새로 생성된 포스트 개수 Integer
-
- 소스 검색 Context(
*newPostIds*
) Load - newPostIds에 데이터가 있는 경우에만 Redis 캐싱 진행
- 없을 경우 바로 종료
- Redis에 포스트 카드뷰 데이터(id, title, summary, thumbnail 등)를 TTL 24시간으로 캐싱하여 응답 속도 <10ms 보장
- 캐싱 관련 통계를 Execution Context에 저장
-
Step Execution Context (메트릭용)
변수명 설명 예시 타입 cacheEntryCount 캐싱된 포스트 개수 Integer
-
관련 문서: [Spring Batch 도입기](https://www.notion.so/Spring-Batch-1e63550b7b55803cae72c7a68eb10a24?pvs=21)
Metric | 의미 |
---|---|
batch_news_fetched_total |
뉴스 URL 성공 |
batch_news_api_fail_total |
뉴스 API 에러 |
batch_video_fetched_total |
영상 URL 성공 |
batch_video_api_fail_total |
영상 API 에러 |
batch_novelty_lowvar_total |
Low-Var 키워드 |
batch_post_created_total |
Post 생성 |
batch_cache_size_total |
Redis MSET 건수 |
batch_no_post_needed (Gauge)
|
0/1 포스트 스킵 |
-
Idempotent –
preventRestart()
+ PK/UNIQUE+INSERT IGNORE
-
병렬성:
ThreadPoolTaskExecutor
설정으로 API 호출 병렬화 - 트랜잭션 경계: 각 Step 별 커밋 포인트, 실패 시 재시도(.retryLimit), 예외 처리
- 모니터링 & 로깅: 처리 건수, 소요 시간, 오류 발생 비율 로깅
-
테스트:
@SpringBatchTest
활용→Step 개별 테스트,@SpringBootTest
로 스케줄러 검증
Sprint | 내용 |
---|---|
2차 | Kafka 스트림 전환 → Near-Real-Time 5 min 윈도우 |
3차 | SimHash · BM25 기반 내용 중복 제거 |
4차 | 사용자별 관심사 모델 → 개인화 추천 |
5차 | Web-Socket Live Push / 모바일 Push 알림 |