OkBublewrap

@코스메 한국 브랜드 화장품 데이터베이스 구축(4) 본문

Python/프로젝트

@코스메 한국 브랜드 화장품 데이터베이스 구축(4)

옥뽁뽁 2025. 2. 27. 11:06

1. review table & review_text table 작업

ERD

 

1) review table

추가로 작업해야할 작업

1. product_id가 들어왔을때 review table이 None 이면 모든 데이터를 수집 해야함

2. product_id가 들어왔을때 review table이 있으면 max(review_date) 이후 데이터를 수집해야함
+ 사이트를 들어가면 최신순으로 나옴

 

하나당 600페이지 정도, 

병렬 크롤링을 진행해볼려고 했으나

코드가 복잡하여 그냥 시도중..

review_text는 병렬 크롤링 시도

 

+ 18시간 12분 14초 (병렬로 꼭하자..) random.uniform(5, 10)으로 한것도 이유인것 같다 1, 3으로해도 될 것 같다.

총 104,809개의 리뷰..

2025-02-27 09:39:38,700 - INFO - Sucessfull get product id
2025-02-28 03:51:53,897 - INFO - Database connection closed.

2) review text table

비교적 수집 난이도 낮은 테이블

병렬 크롤링 공부할 겸 시도

 

1. DB 초기화 (init_db())

def init_db():
    conn = sqlite3.connect(DB_NAME)
    cursor = conn.cursor()
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS review_text (
        review_id TEXT PRIMARY KEY,
        review_text TEXT
    );
    """)
    conn.commit()
    conn.close()

 

review_text TABLE이 없으면 생성 있으면 무시

 

2. 크롤링할 review_id 목록 가져오기 (get_review_id())

def get_review_id():
    conn = sqlite3.connect(DB_NAME)
    cursor = conn.cursor()
    cursor.execute("SELECT review_id FROM review")
    review_ids = [row[0] for row in cursor.fetchall()]
    conn.close()
    return review_ids

 

저장된 review TABLE에서 review_id 추출 후 리스트로 반환

 

3. 멀티스레딩으로 크롤링 (fetch())

def fetch(review_id):
    """웹페이지 요청 후 리뷰 텍스트 크롤링"""
    try:
        url = cosme_url['review_text'] + f"{review_id}/"
        response = requests.get(url, headers=headers, timeout=5)
        response.raise_for_status()  
        soup = BeautifulSoup(response.text, "html.parser")
        time.sleep(random.uniform(1, 2))  

        review_text = soup.find("p", class_="read")
        if review_text:
            return (review_id, review_text.get_text(strip=True, separator=" "))
        else:
            return None
    except Exception as e:
        return None

 

1. review_id 인자로 받아서 해당 리뷰 페이지 url 생성

2. 리뷰 데이터 가져오기 앞뒤 공백 제거 및 <br/> : 띄어쓰기 제거

 

4. 데이터 저장을 위한 Queue와 save_to_db_worker()

멀티스레딩으로 작업할 경우 동시에 쓰면 오류 발생

크롤링 병렬, DB 저장 싱글 쓰레드로 처리

def save_to_db_worker(queue):
    """하나의 쓰레드에서 DB 저장을 처리하는 Worker"""
    conn = sqlite3.connect(DB_NAME)
    cursor = conn.cursor()

    while True:
        data_batch = queue.get()
        if data_batch is None:  # 종료 신호 받으면 종료
            break

        try:
            cursor.executemany("""
                INSERT INTO review_text (review_id, review_text) 
                VALUES (?, ?) 
                ON CONFLICT(review_id) DO NOTHING
            """, data_batch)
            conn.commit()
        except Exception as e:
            logging.error(f"DB insert error: {e}")

    conn.close()

 

 

5. main() 함수 실행 흐름

def main():
    """멀티스레딩으로 크롤링 실행 + 싱글 쓰레드 DB 저장"""
    init_db()
    review_ids_list = get_review_id()
    
    queue = Queue()
    db_thread = Thread(target=save_to_db_worker, args=(queue,))
    db_thread.start()  # DB 저장 전용 쓰레드 시작

    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=10) as executor:
        future_to_id = {executor.submit(fetch, rid): rid for rid in review_ids_list}

        batch = []
        for future in as_completed(future_to_id):
            result = future.result()
            if result:
                batch.append(result)

            if len(batch) >= 10:  # 10개씩 모아서 DB에 저장
                queue.put(batch)
                batch = []

    # 남은 데이터 저장
    if batch:
        queue.put(batch)

    # DB 쓰레드 종료 신호 보내기
    queue.put(None)
    db_thread.join()

    end_time = time.time()
    logging.info(f"크롤링 완료! 실행 시간: {end_time - start_time:.2f}초")

 

코드 흐름 구조

 

이제 GPT가 구조까지 짜주는구나.

4, 5번 코드 풀이

 

def main():
    """멀티스레딩으로 크롤링 실행 + 싱글 쓰레드 DB 저장"""
    init_db()
    review_ids_list = get_review_id()
    
    queue = Queue() # 1
    db_thread = Thread(target=save_to_db_worker, args=(queue,)) # 2
    db_thread.start()  # DB 저장 전용 쓰레드 시작 # 3

    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=10) as executor: # 4
        future_to_id = {executor.submit(fetch, rid): rid for rid in review_ids_list} # 5

        batch = []
        for future in as_completed(future_to_id): # 6
            result = future.result() # 7
            if result:
                batch.append(result)

            if len(batch) >= 10:  # 10개씩 모아서 DB에 저장
                queue.put(batch)
                batch = []

    # 남은 데이터 저장
    if batch:
        queue.put(batch)

    # DB 쓰레드 종료 신호 보내기
    queue.put(None)
    db_thread.join()

    end_time = time.time()
    logging.info(f"크롤링 완료! 실행 시간: {end_time - start_time:.2f}초")

 

  1. Queue() 생성
  2. Thread() 생성, Target : 스레드에서 실행할 함수 또는 매서드를 지정, 인자로는 튜플로 전달 queue
  3. DB 저장 전용 쓰레드 시작
  4. 멀티스레딩을 관리하는 객체, 10개의 스레드를 사용할 수 있게 설정
  5. 예시
    review_ids_list = [1001, 1002, 1003] 일 때
    {
        <future_object_1>: 1001, 
        <future_object_2>: 1002, 
        <future_object_3>: 1003
    }
    객체 생성

    executor.submit(fetch, rid)가 각 review_id에 대해 비동기적으로 fetch 함수 실행
    {
        <Future at 0x7fc4e9c2a400 state=running>: 1001, 
        <Future at 0x7fc4e9c2a4f0 state=running>: 1002, 
        <Future at 0x7fc4e9c2a5e0 state=running>: 1003
    }
    실행중
  6. 위 객체 중 state=completed 일 경우 해당 객체를 처리
  7. 크롤링 한 결과 batch에 추가, 10개이상일 때 DB에 저장
  8. 마지막 10개 미만일 때 나머지 추가 저장
  9. 종료신호 - 기달리기