Data Crawling 및 Vector DB 설계 - 100-hours-a-week/12-marong-Wiki GitHub Wiki

📍 Data Crawling - Settings

BeautifulSoup, Selenium을 이용하여 HTML 리소스에 포함된 장소 데이터 수집

class MyWebCrawler:
    def __init__(self, my_url):
        self.my_url = my_url
        options = Options()
        options.add_argument("--headless")  # 백그라운드 실행
        options.add_argument("--disable-gpu")  # GPU 가속 끄기
        options.add_argument("--no-sandbox")   # 리눅스에서 충돌 방지용
        options.add_argument("--window-size=1920,1080")  
        
        self.driver = webdriver.Chrome(
            service=Service(ChromeDriverManager().install()),
            options=options
        )
        self.info = dict()
  • 백그라운드 실행으로 자동 웹 크롤링 편의성 증대

장소 정보, 리뷰를 독립적으로 수집하여 원하는 데이터에 대해 선별적으로 수집 가능

reviews = []

for url in urls:
  crawler = MyWebCrawler(url)
  crawler.collect_place_info()
  crawler.collect_reviews()
  crawler.quit_driver()
  
  crawler.info['url'] = url
  print(crawler.info)
  reviews.append(crawler.info)


📍 Data Preprocessing

수집한 데이터를 기반으로 pandas 기반 데이터프레임 생성

df_reviews = pd.DataFrame(reviews)

데이터 전처리

상호명 전처리

df_reviews['상호명'] = df_reviews['상호명'].str.replace(r'^장소명', '', regex=True)

평균 별점 계산

def extract_avg_rating(review_data):
    if isinstance(review_data, dict):
        ratings = [r.get('별점', 0) for r in review_data.values() if isinstance(r, dict) and '별점' in r]
        return round(sum(ratings) / len(ratings), 2) if ratings else None
    return None

df_reviews['평균별점'] = df_reviews['리뷰'].apply(extract_avg_rating)

📍 USE DATABASE(Chroma DB 연결)

Chroma DB Client 객체 생성

# Chroma 클라이언트 초기화
from chromadb import PersistentClient

chroma_client = PersistentClient(path="./merged_chroma_db")

📍 CREATE

Chroma DB에 신규 Collection 생성(기존에 존재하는 경우 collection 객체로 로드)

# get_or_create_collection은 존재하지 않으면 새로 만들고, 있으면 가져옴
vibelikes_collection = chroma_client.get_or_create_collection(name="vibelikes_collection")

📍 INSERT

Chroma DB에 Collection 내 데이터 업로드

for idx, row in df.iterrows():
    review_emb = row['리뷰_평균임베딩']
    menu_emb = row['메뉴_임베딩']

    if review_emb is not None and menu_emb is not None:
        valid_documents.append(row['상호명'])
        valid_review_embs.append(normalize_vector(review_emb))
        valid_menu_embs.append(normalize_vector(menu_emb))

        metadata = {
            "상호명": row.get('상호명', ''),
            "대표카테고리": row.get('대표_카테고리', ''),
            ...(중략)
            }
            valid_metadatas.append(metadata)
            valid_ids.append(f"store_{idx}")

# 배치 업로드 함수
def add_to_chroma_in_batches(collection, documents, embeddings, metadatas, ids):
    for i in range(0, len(documents), batch_size):
        collection.add(
            documents=documents[i:i+batch_size],
            embeddings=embeddings[i:i+batch_size],
            metadatas=metadatas[i:i+batch_size],
            ids=ids[i:i+batch_size]
        )
        print(f"Batch {i // batch_size + 1} added to '{collection.name}'")

# 실제 업로드 실행
add_to_chroma_in_batches(review_collection, valid_documents, valid_review_embs, valid_metadatas, valid_ids)
add_to_chroma_in_batches(menu_collection, valid_documents, valid_menu_embs, valid_metadatas, valid_ids)

# 완료 출력
print("업로드 완료")
print("리뷰 컬렉션 문서 수:", review_collection.count())
print("메뉴 컬렉션 문서 수:", menu_collection.count())
print("히스토리 컬렉션 문서 수:", history_collection.count())
  • 임베딩 벡터 INSERT 시 컬렉션 내 기존 벡터의 차원과 동일하게 진행해야 함(가급적 같은 임베딩 모델을 사용하는 것을 권장)
  • 배치 사이즈를 500 이하로 설정하여 INSERT 가능 한도 내에서 데이터 삽입을 진행해야 함
  • NoSQL 구조로 필드를 자유롭게 확장할 수 있음

주의점: documents, images, uris 중 어느 하나를 포함하여야 INSERT 가능

# 잘못된 코드
collection.add(
    ids=[...],
    metadatas=[...]
)  # documents, images, uris 중 아무것도 없음

# 올바른 코드
collection.add(
    ids=[...],
    metadatas=[...],
    documents=["dummy"]  # 최소한 하나는 필수
)

📍 UPDATE

update_kwargs를 설정하여 .update로 데이터 업데이트 가능

update_kwargs = {"ids": [doc_id]}

if is_valid_embedding:
    update_kwargs["embeddings"] = embedded_vector
if is_valid_score:
    update_kwargs["metadatas"] = [{"평균별점": avg_score}]

review_collection.update(**update_kwargs)
  • 업데이트를 위해 MySQL의 UNIQUE KEY와 동일한 개념의 필드가 설정되어 있어야 함
  • 통상적으로 ids로 설정하며 update_kwargs를 통해 임베딩 벡터, 메타데이터 등의 데이터 업데이트 가능

📍 DELETE

.deleteids, metadatas 등을 설정하여 Chroma DB 내 데이터 삭제 가능

review_collection.delete(ids=[del_id])
menu_collection.delete(where={"링크": url})

📍 UPSERT

RDB와는 다른 쿼리로, 설정한 Query에 대해 데이터가 존재하면 UPDATE, 존재하지 않으면 INSERT 실행

vibelikes_collection.upsert(
        ids=[chroma_user_id],
        embeddings=[vibe_embed_vector.tolist()],
        metadatas=[{"user_id": user_id}]
    )

📍 Chroma DB UPDATE Pipeline(UPDATE + DELETE 혼합 설계)

DB UPDATE

THRESHOLD = 90

not_found = []
updated_url = set()
updated_count = 0

for idx, row in df_reviews.iterrows():
    url = row['url'].strip()
    embedded_vector = row['리뷰_평균임베딩']
    avg_score = row['평균별점']

    # 결측 처리 플래그
    is_valid_embedding = isinstance(embedded_vector, list) and all(isinstance(x, (int, float)) for x in embedded_vector)
    is_valid_score = isinstance(avg_score, (int, float)) and not math.isnan(avg_score)

    if not is_valid_embedding and not is_valid_score:
        print(f"[스킵] 둘 다 없음 → idx {idx}, url: {url}")
        continue

    try:
        results = review_collection.get(
            where={"링크": url}
        )
        print(f"검색 결과 → idx {idx + 1}, url: {url}, results: {results}")

        if results["ids"]:
            doc_id = results["ids"][0]
            metadata = results["metadatas"][0]
            print(f"문서 찾음 → idx {idx + 1}, url: {url}, doc_id: {doc_id}")
            
            place_name_db = metadata.get("상호명", "").strip()
            place_name_df = row['상호명'].strip()
            print(f"DB 상호명: {place_name_db}, DF 상호명: {place_name_df}")

            sim = fuzz.ratio(place_name_db, place_name_df)
            print(f"유사도: {sim} ({place_name_db} vs {place_name_df})")
            
            if sim < THRESHOLD:
                print(f"상호명 유사도 낮음 ({sim}) → idx {idx}, DB: {place_name_db}, DF: {place_name_df}")
                continue

            update_kwargs = {"ids": [doc_id]}

            if is_valid_embedding:
                update_kwargs["embeddings"] = embedded_vector
            if is_valid_score:
                update_kwargs["metadatas"] = [{"평균별점": avg_score}]

            review_collection.update(**update_kwargs)
            updated_url.add(url)
            updated_count += 1
            print(f"Updated {url}")
    except:
        print(f"문서 찾을 수 없음 → idx {idx + 1}, url: {url}")
        not_found.append({"idx": idx + 1, "url": url})
        
print(f"업데이트 문서 수: {updated_count}")           
delete_count = 0

for meta, doc_id in zip(all_docs['metadatas'], all_docs['ids']):
    url = meta.get('링크', '').strip()
    if url in urls_to_delete:
        # doc_id가 리스트로 반환될 때도 있으니
        del_id = doc_id if isinstance(doc_id, str) else doc_id[0]
        review_exists = review_collection.get(ids=[del_id])['ids'] != []
        menu_exists = menu_collection.get(where={"링크": url})['ids'] != []
        
        if review_exists and menu_exists:
            try:
                review_collection.delete(ids=[del_id])
                menu_collection.delete(where={"링크": url})
                delete_count += 1
                print(f"삭제 완료: url={url}, id={del_id}")
            except Exception as e:
                print(f"[삭제 실패] url={url}, error: {e}")
        else:
            print(f"둘 다 있는 경우에만 삭제! (review: {review_exists}, menu: {menu_exists})")
            
print(f"총 삭제된 문서 수: {delete_count}")
⚠️ **GitHub.com Fallback** ⚠️