Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
Tags
- R
- harkerrank
- 스파르타코딩
- 중회귀모형
- 내일배움캠프
- 파이썬 철저 입문
- 미세먼지
- 스파르타 코딩
- 프로그래머스
- 스파르타
- 파이썬
- wil
- Cluster
- 웹 스크랩핑
- 파이썬 머신러닝 완벽가이드
- 실전 데이터 분석 프로젝트
- TiL
- 파이썬 철저입문
- 파이썬 완벽 가이드
- 파이썬 머신러닝 완벽 가이드
- MySQL
- 오블완
- SQL
- 내일배움
- 프로젝트
- 티스토리챌린지
- 텍스트 분석
- hackerrank
- 회귀분석
- 내일배움카드
Archives
- Today
- Total
OkBublewrap
@코스메 한국 브랜드 화장품 데이터베이스 구축(4) 본문
1. review table & review_text table 작업
ERD
1) review table
추가로 작업해야할 작업
1. product_id가 들어왔을때 review table이 None 이면 모든 데이터를 수집 해야함
2. product_id가 들어왔을때 review table이 있으면 max(review_date) 이후 데이터를 수집해야함
+ 사이트를 들어가면 최신순으로 나옴
하나당 600페이지 정도,
병렬 크롤링을 진행해볼려고 했으나
코드가 복잡하여 그냥 시도중..
review_text는 병렬 크롤링 시도
+ 18시간 12분 14초 (병렬로 꼭하자..) random.uniform(5, 10)으로 한것도 이유인것 같다 1, 3으로해도 될 것 같다.
총 104,809개의 리뷰..
2025-02-27 09:39:38,700 - INFO - Sucessfull get product id
2025-02-28 03:51:53,897 - INFO - Database connection closed.
2) review text table
비교적 수집 난이도 낮은 테이블
병렬 크롤링 공부할 겸 시도
1. DB 초기화 (init_db())
def init_db():
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS review_text (
review_id TEXT PRIMARY KEY,
review_text TEXT
);
""")
conn.commit()
conn.close()
review_text TABLE이 없으면 생성 있으면 무시
2. 크롤링할 review_id 목록 가져오기 (get_review_id())
def get_review_id():
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute("SELECT review_id FROM review")
review_ids = [row[0] for row in cursor.fetchall()]
conn.close()
return review_ids
저장된 review TABLE에서 review_id 추출 후 리스트로 반환
3. 멀티스레딩으로 크롤링 (fetch())
def fetch(review_id):
"""웹페이지 요청 후 리뷰 텍스트 크롤링"""
try:
url = cosme_url['review_text'] + f"{review_id}/"
response = requests.get(url, headers=headers, timeout=5)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
time.sleep(random.uniform(1, 2))
review_text = soup.find("p", class_="read")
if review_text:
return (review_id, review_text.get_text(strip=True, separator=" "))
else:
return None
except Exception as e:
return None
1. review_id 인자로 받아서 해당 리뷰 페이지 url 생성
2. 리뷰 데이터 가져오기 앞뒤 공백 제거 및 <br/> : 띄어쓰기 제거
4. 데이터 저장을 위한 Queue와 save_to_db_worker()
멀티스레딩으로 작업할 경우 동시에 쓰면 오류 발생
크롤링 병렬, DB 저장 싱글 쓰레드로 처리
def save_to_db_worker(queue):
"""하나의 쓰레드에서 DB 저장을 처리하는 Worker"""
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
while True:
data_batch = queue.get()
if data_batch is None: # 종료 신호 받으면 종료
break
try:
cursor.executemany("""
INSERT INTO review_text (review_id, review_text)
VALUES (?, ?)
ON CONFLICT(review_id) DO NOTHING
""", data_batch)
conn.commit()
except Exception as e:
logging.error(f"DB insert error: {e}")
conn.close()
5. main() 함수 실행 흐름
def main():
"""멀티스레딩으로 크롤링 실행 + 싱글 쓰레드 DB 저장"""
init_db()
review_ids_list = get_review_id()
queue = Queue()
db_thread = Thread(target=save_to_db_worker, args=(queue,))
db_thread.start() # DB 저장 전용 쓰레드 시작
start_time = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_id = {executor.submit(fetch, rid): rid for rid in review_ids_list}
batch = []
for future in as_completed(future_to_id):
result = future.result()
if result:
batch.append(result)
if len(batch) >= 10: # 10개씩 모아서 DB에 저장
queue.put(batch)
batch = []
# 남은 데이터 저장
if batch:
queue.put(batch)
# DB 쓰레드 종료 신호 보내기
queue.put(None)
db_thread.join()
end_time = time.time()
logging.info(f"크롤링 완료! 실행 시간: {end_time - start_time:.2f}초")
코드 흐름 구조
이제 GPT가 구조까지 짜주는구나.
4, 5번 코드 풀이
def main():
"""멀티스레딩으로 크롤링 실행 + 싱글 쓰레드 DB 저장"""
init_db()
review_ids_list = get_review_id()
queue = Queue() # 1
db_thread = Thread(target=save_to_db_worker, args=(queue,)) # 2
db_thread.start() # DB 저장 전용 쓰레드 시작 # 3
start_time = time.time()
with ThreadPoolExecutor(max_workers=10) as executor: # 4
future_to_id = {executor.submit(fetch, rid): rid for rid in review_ids_list} # 5
batch = []
for future in as_completed(future_to_id): # 6
result = future.result() # 7
if result:
batch.append(result)
if len(batch) >= 10: # 10개씩 모아서 DB에 저장
queue.put(batch)
batch = []
# 남은 데이터 저장
if batch:
queue.put(batch)
# DB 쓰레드 종료 신호 보내기
queue.put(None)
db_thread.join()
end_time = time.time()
logging.info(f"크롤링 완료! 실행 시간: {end_time - start_time:.2f}초")
- Queue() 생성
- Thread() 생성, Target : 스레드에서 실행할 함수 또는 매서드를 지정, 인자로는 튜플로 전달 queue
- DB 저장 전용 쓰레드 시작
- 멀티스레딩을 관리하는 객체, 10개의 스레드를 사용할 수 있게 설정
- 예시
review_ids_list = [1001, 1002, 1003] 일 때
{
<future_object_1>: 1001,
<future_object_2>: 1002,
<future_object_3>: 1003
}
객체 생성
executor.submit(fetch, rid)가 각 review_id에 대해 비동기적으로 fetch 함수 실행
{
<Future at 0x7fc4e9c2a400 state=running>: 1001,
<Future at 0x7fc4e9c2a4f0 state=running>: 1002,
<Future at 0x7fc4e9c2a5e0 state=running>: 1003
}
실행중 - 위 객체 중 state=completed 일 경우 해당 객체를 처리
- 크롤링 한 결과 batch에 추가, 10개이상일 때 DB에 저장
- 마지막 10개 미만일 때 나머지 추가 저장
- 종료신호 - 기달리기
'Python > 프로젝트' 카테고리의 다른 글
@코스메 한국 브랜드 화장품 데이터베이스 구축(5) (0) | 2025.03.01 |
---|---|
@코스메 한국 브랜드 화장품 데이터베이스 구축(3) (0) | 2025.02.25 |
@코스메 한국 브랜드 화장품 데이터베이스 구축(2) (0) | 2025.02.24 |
@코스메 한국 브랜드 화장품 데이터베이스 구축(1) (3) | 2025.02.23 |
Discord 미세먼지 Msg (0) | 2023.04.21 |