반응형

파이썬 동시성/비동기 프로그래밍 4. concurrent.futures

 

[Python]파이썬 동시성/비동기 프로그래밍 4. concurrent.futures

파이썬 동시성/비동기 프로그래밍 3. GIL(Global interpreter Lock) [Python]파이썬 동시성/비동기 프로그래밍 3. GIL(Global interpreter Lock) 파이썬 동시성/비동기 프로그래밍 2. Asyncio [python]파이썬 동시..

leo-bb.tistory.com

concurrent.futures.ThreadPool과 async I/O를 활용한 Riss Crawler

I. 개요

  •  과거에 Riss 논문 검색 데이터를 csv파일로 저장하기라는 게시물을 포스팅한 적이 있습니다.
  • 파이썬의 비동기/동시성 프로그래밍을 다루며 ThreadPool(다중 스레드) 방식이 network I/O의 성능을 높일 수 있음을 다뤘는데, 이를 활용한 Riss 논문 데이터 크롤링 코드를 다룹니다.
  • 두 코드가 실행되는 경우 어떤 차이가 있는지 비교해보시면 좋을 것 같습니다.

1.  주요 변경사항

1. 검색어를 사용자가 입력할 수 있도록 수정
2. 클래스 방식으로 재구현
3. 몇 가지 에러 핸들링 추가
4. 크롬 드라이버 옵션을 통해 백그라운드 실행 및 1차원적인 headless 탐지 회피 추가
5. 국내/국외 학술지 구분

from bs4 import BeautifulSoup as bs
import urllib.request
from selenium import webdriver
import pandas as pd
import datetime
import os
import re
import logging
import timeit
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor




class RissCrawler():
    '''
    Riss Crawler
    Author : London
    Date : 2020.07.25
    Description : Riss 에서 논문 정보를 검색하여 레퍼런스 형식에 맞춰 csv로 저장하는 클래스로 검색어만 지정하는 경우 국문 논문의 최신 10개 항목만 수집합니다.
    '''
    def __init__(self, searchword : str, country : str = 'kor', pages : int = 1):
        self.searchword = searchword.replace(' ','+')
        self.country = country
        self.pages = pages
        self.executor = ThreadPoolExecutor(max_workers=min(pages+3, os.cpu_count()))
        self.URLS = []

        self.folder_root = os.path.join(os.path.expanduser('~'),'Desktop','risscrawl')
        self.csv_name = f"{self.searchword}.csv"

    def Get_base_url_list(self) :
        ''' 검색한 키워드에 대한 논문 리스트를 입력한 페이지 수만큼 수집'''
        URLS = []
        assert self.pages >= 1, "pages must be greater than 1"
        for page in range(self.pages):
            if self.country == "kor" :
                URL = f"http://www.riss.kr/search/Search.do?isDetailSearch=N&searchGubun=true&viewYn=OP&queryText=&strQuery={self.searchword}&exQuery=&exQueryText=&order=%2FDESC&onHanja=false&strSort=RANK&p_year1=&p_year2=&iStartCount={page * 10}&orderBy=&fsearchMethod=search&sflag=1&isFDetailSearch=N&pageNumber=&resultKeyword=&fsearchSort=&fsearchOrder=&limiterList=&limiterListText=&facetList=&facetListText=&fsearchDB=&icate=re_a_kor&colName=re_a_kor&pageScale=10&isTab=Y&regnm=&dorg_storage=&language=&language_code=&query={self.searchword}"
                URLS.append(URL)

            elif self.country == "eng" :
                URL = f"http://www.riss.kr/search/Search.do?isDetailSearch=N&searchGubun=true&viewYn=OP&queryText=&strQuery={self.searchword}&exQuery=&exQueryText=&order=%2FDESC&onHanja=false&strSort=RANK&p_year1=&p_year2=&iStartCount={page * 10}&orderBy=&fsearchMethod=search&sflag=1&isFDetailSearch=N&pageNumber=&resultKeyword=&fsearchSort=&fsearchOrder=&limiterList=&limiterListText=&facetList=&facetListText=&fsearchDB=&icate=re_a_over&colName=re_a_over&pageScale=10&isTab=Y&regnm=&dorg_storage=&language=&language_code=&query={self.searchword}"
                URLS.append(URL)

            else :
                logging.fatal("Wrong Variable Error")
                print("country -> 'kor' or 'eng' ")
                break


        self.URLS = URLS


    def Check_dir(self):
        '''folrder root 존재 여부 확인'''
        if not os.path.isdir(self.folder_root):
            os.mkdir(self.folder_root)

    def Get_Paper_info(self, url : str) :
        '''개별 논문의 정보 저장'''
        options = webdriver.ChromeOptions()
        options.add_argument('headless')
        options.add_argument("user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36")

        driver = webdriver.Chrome(os.getcwd() + "/chromedriver", options = options)
        driver.get(url)

        html = driver.page_source
        soup = bs(html, 'html.parser')

        title_source = soup.find('h3','title')
        title_txt = title_source.get_text("", strip=True).split('=')
        if len(title_txt) < 2 :
            title = re.sub('\n\b','',str(title_txt[0]).strip())

        else : 
            title = re.sub('\n\b','',str(title_txt[0]).strip()) + ' (' + str(title_txt[1]).strip() + ')'


        text_source = soup.find('div','text') 
        text = text_source.get_text("",strip=True)

        detail_box = []
        detail_source = soup.select('#soptionview > div > div.thesisInfo > div.infoDetail.on > div.infoDetailL > ul > li > div > p')
        for detail in detail_source :
            detail_content = detail.get_text("", strip=True)
            detail_wrap = []
            detail_wrap.append(detail_content)

            detail_box.append(detail_wrap)

        author = ",".join(detail_box[0])
        book = "".join(detail_box[2] + detail_box[3]).replace('\n','').replace('\t','').replace(" ","") + " p." + "".join(detail_box[-2])
        keyword = ",".join(detail_box[6])

        data = pd.DataFrame({'저자': [author], '제목': [title], '수록지': [book], '핵심어': [keyword], '요약': [text], '링크': [url]})

        driver.close()

        return data

    def Save_paper_info_to_csv(self, data):
        '''논문 정보를 CSV로 저장'''

        self.Check_dir()

        if os.path.isfile(os.path.join(self.folder_root , self.csv_name)):
            data.to_csv(os.path.join(self.folder_root, self.csv_name), mode='a', header = False, index=False)

        else:
            data.to_csv(os.path.join(self.folder_root, self.csv_name), mode='w',header = True, index=False)


    async def fetch(self, url):
        print('Thread Name :', threading.current_thread().getName(), 'Start', int(url[url.find("iStartCount=")+12])+1,"page")

        resp = await loop.run_in_executor(self.executor, urllib.request.urlopen, url)
        soup = bs(resp, 'lxml', from_encoding="utf-8")

        for i in range(10):

            try:
                each_paper_url_source = soup.select('li > div.cont > p.title > a')[i]['href']
                paper_url = "http://riss.or.kr" + each_paper_url_source

                data = self.Get_Paper_info(paper_url)

                self.Save_paper_info_to_csv(data)

            except : 
                logging.error("Error, you got whole paper in riss. if csv file is empty, please run again ")
                break

        print('Thread Name :', threading.current_thread().getName(), 'Done', int(url[url.find("iStartCount=")+12])+1,"page")

    async def crawl(self):
        self.Get_base_url_list()

        futures = [ asyncio.ensure_future(self.fetch(url)) for url in self.URLS ]

        await asyncio.gather(*futures)

        print('Done, Check your desktop')

if __name__ == "__main__" :
    loop = asyncio.get_event_loop()
    start = timeit.default_timer()
    r = RissCrawler("big data")
    loop.run_until_complete(r.crawl())
    duration = timeit.default_timer() - start
    print("runnung_time : ", duration)
반응형
복사했습니다!