[python] multi threading or multi processing for fetching url - dsindex/blog GitHub Wiki

reference

  • 특정 WEB API에 대량의 쿼리를 던져서 결과를 받아올 경우, multi-threading이나 multi-processing을 사용해서 구현할수 있다. 아래 코드는 위 링크의 것을 약간 수정한 버전이다. IO연산이 대부분인 경우 multi-threading은 (GIL과 무관하게) 매우 효율적이다. 참고자료 따라서, url을 fetch하는 경우 multi-processing보다 multi-threading이 더 빠른것은 당연하다. 하지만, 처리할 데이터(query)양이 매우 많은 경우 threading 객체의 생성/소멸에서 메모리 사용량이 기하급수적으로 증가하는 문제가 있다. 이것은 아마도 GC의 문제인듯 보인다. 따라서, 이런 경우는 multi-processing with pool 방식을 사용하는 것이 더 효율적이다.

  • communication between processs

  • multi-threading

import os
import sys
from   optparse import OptionParser
import urllib
import urllib2
import threading
import Queue
import time

API = "http://XXX/api/"

def fetch_url(entry, queue) :
    url = entry['url']
    data = entry['data']
    ret = urllib2.urlopen(url).read()
    queue.put(ret)
    
def fetch_parallel(urls_to_load) :
    queue = Queue.Queue()
    threads = [ threading.Thread(target=fetch_url, args=(entry,queue)) for entry in urls_to_load ]

    range  = 100
    iter   = len(threads) / range
    remainder = len(threads) % range
    for idx in xrange(iter) :
        begin = range * idx
        end   = range * (idx + 1)
        for t in threads[begin:end] :
            t.start()
        for t in threads[begin:end] :
            t.join()
    if remainder != 0 :
        for t in threads[end:end+remainder] :
            t.start()
        for t in threads[end:end+remainder] :
            t.join()

    return queue

if __name__ == '__main__' :

    parser = OptionParser()
    parser.add_option("--verbose", action="store_const", const=1, dest="verbose", help="verbose mode")
    (options, args) = parser.parse_args()

    if options.verbose == 1 : VERBOSE = 1

    urls_to_load = []

    for line in sys.stdin :
        line = line.strip()
        if not line : continue
        try : [query,freq] = line.split('\t',1)
        except : continue

        param = { 'q':query }
        param = urllib.urlencode(param)
        url   = API + '?' + param
        entry = { 'url':url, 'data':line }
        urls_to_load.append(entry)

    queue = fetch_parallel(urls_to_load)

    while not queue.empty() :
        entry = queue.get()
        print entry
  • multi-processing
import os
import sys
from   optparse import OptionParser
import urllib
import urllib2
from    multiprocessing import Process, Queue
import time

API = "http://XXX/api/"

def fetch_url(entry, queue) :
    url = entry['url']
    data = entry['data']
    ret = urllib2.urlopen(url).read()
    queue.put(ret)
    
def fetch_parallel(urls_to_load) :
    queue = Queue()
    processes = [ Process(target=fetch_url, args=(entry,queue)) for entry in urls_to_load ]

    range  = 100
    iter   = len(processes) / range
    remainder = len(processes) % range
    for idx in xrange(iter) :
        begin = range * idx
        end   = range * (idx + 1)
        for t in processes[begin:end] :
            t.start()
        for t in processes[begin:end] :
            t.join()
    if remainder != 0 :
        for t in processes[end:end+remainder] :
            t.start()
        for t in processes[end:end+remainder] :
            t.join()

    return queue

if __name__ == '__main__' :

    parser = OptionParser()
    parser.add_option("--verbose", action="store_const", const=1, dest="verbose", help="verbose mode")
    (options, args) = parser.parse_args()

    if options.verbose == 1 : VERBOSE = 1

    urls_to_load = []

    for line in sys.stdin :
        line = line.strip()
        if not line : continue
        try : [query,freq] = line.split('\t',1)
        except : continue

        param = { 'q':query }
        param = urllib.urlencode(param)
        url   = API + '?' + param
        entry = { 'url':url, 'data':line }
        urls_to_load.append(entry)

    queue = fetch_parallel(urls_to_load)

    while not queue.empty() :
        entry = queue.get()
        print entry
  • multi-processing with pool
#!/usr/bin/env python
#-*- coding: utf8 -*-

import os
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import re
from   optparse import OptionParser
import urllib
import urllib2
from multiprocessing import Pool
import time
import json

# --verbose
VERBOSE = 0

API = "http://XXX/api/"

def fetch_url(entry) :
    question = entry['question']
    param = { 'q':question, 'mode':'debug' }
    param = urllib.urlencode(param)
    url   = API + '?' + param

    ret = ""
    try : ret = urllib2.urlopen(url).read()
    except :
        sys.stderr.write("urlopen error : %s\n" % url)
        return None
    try :
        ret_json = json.loads(ret,encoding="utf-8")
    except :
        sys.stderr.write("json load error : %s\n" % ret)
        return None

    if ret_json and ret_json['status'] == 200 :
    ....

def fetch_parallel(urls_to_load) :
    p = Pool(20)
    results = p.map(fetch_url, urls_to_load)
    p.close()
    p.join()
    for result in results :
        if result : print result

if __name__ == '__main__' :

    parser = OptionParser()
    parser.add_option("--verbose", action="store_const", const=1, dest="verbose", help="verbose mode")
    (options, args) = parser.parse_args()

    if options.verbose == 1 : VERBOSE = 1

    urls_to_load = []

    idx = 0
    for line in sys.stdin :
        line = line.strip()
        if not line : continue
        try : [question, freq] = line.split('\t',1)
        except : continue

        entry = { 'question':question }
        urls_to_load.append(entry)

        idx += 1

    fetch_parallel(urls_to_load)