python|多进程设置 - pjpan/DataScience GitHub Wiki

第一部分:python的多进程的调试

使用python的multiprocess模块,场景是根据core来设置进程池,

大概背景:

实践主要是为了解决:现在有700多万的地址数据,我们会调用一个地址标准化的服务,单个请求大概在90ms左右,运算700万需要花费太久了,所以需要进行分布式计算; 代码如下,并行化的主要核心代码是:

pool.map(single_addr_extract, shanghai_addr.loc[0:100,'address_line1'])

利用multiprocessing.Pool.map函数来进行分布式计算,Pool()模式的进程池是os.cpu_count()即cpu内核; 原始代码:

# -*- coding: utf-8 -*-
"""
Created on Wed Nov  1 13:49:14 2017

@author: PANPENGJU716
"""
import pandas as pd
import os
import re
import urllib
import sys
import time
from multiprocessing import Pool
import pickle

# 删除无用的特殊字符
def del_useless_str(addr):
    return re.sub('[-|.|(|)]','',addr,0)

def single_addr_extract(addr):
    """
    addr 是个地址,返回一个解析之后的地址结果;

   """
    loc_parameters = {
    'address': del_useless_str(addr)
    }
    print(loc_parameters)
    # 对URL的汉字进行decode
    #服务API地址;
    api_url = "http://10.137.39.9:8889/api/v1000/single_address"
    single_addres_encode = urllib.parse.urlencode(loc_parameters)
    addr_request_url = api_url+'?'+single_addres_encode
    print(addr_request_url)
    # 请求结果,把结果存下来;

def save_files(x):
	"""
	把文件存储成pkl格式
	"""
    with open('./shanghai_addr.txt','a+') as f:   # a+表示的文件追加模式,用其他模式的话,可能造成文件读写异常;
        f.write(str(eval(x))+"\n")  # strip between terms
#        json.dump(eval(x), f, ensure_ascii=False)
        f.close()

# 多进程版本 
if __name__ == "__main__": 
    
    os.chdir('D:/data/')
    pkl_path = './shanghai_addr_pkl.pkl'
    # read data
    if os.path.exists(pkl_path):
        shanghai_addr = open_pkl()
    else:
        shanghai_addr = pd.read_table(filepath_or_buffer = 'export_addr_shanghai_2017-11-01 10-51-04.csv')
        save_pkls(shanghai_addr)

    print(start, end, sep = '\n')
    e1 = time.time()
    pool = Pool(4)  # os.cpu_count,决定了处理的上限,4核则表示4个处理器;
    resultList = pool.map(single_addr_extract, shanghai_addr.loc[0:100,'address_line1'])
    pool.close()
    pool.join()
    e2 = time.time()
    print(float(e2 - e1))

  • 针对core进行测试,可以看出运行速度并发量小的时候,可以设置2*core-1效果最佳;文件越大,core设置越大越好;

前面是core,后面是运算时间:

  • rows = 21条 1 - 122.62400007247925 2 - 71.88000011444092 3 - 49.44399976730347 4 - 37.59999990463257 5 - 32.06599998474121 6 - 26.594000101089478 7 - 21.628000020980835 8 - 21.52300000190735 9 - 21.49400019645691

  • rows = 100条 1 - 589.7630000114441 2 - 313.47500014305115 3 - 220.40499997138977 4 - 171.0090000629425 5 - 148.7720000743866 6 - 136.88300013542175 7 - 13.871999979019165 8 - 70.09800004959106 9 - 80.44023323059082

  • rows = 1000 1 - 5952.56351017952 2 - 3025.6070001125336 3 - 2044.46599984169 4 - 1546.5250000953674 5 - 1270.691999912262 6 - 1065.5169999599457 7 - 930.6379997730255 8 - 836.364000082016 9 - 742.7260000705719

  • py文件只有一个输入情况下的并行,如果有两个参数,进行并行化操作的代码如下:

第二部分,利用windows的batch方法来并行执行脚本

正常情况下,dos在一个batch文件中是不能直接并行执行同一个脚本的,但是我们可以同时打开多个dos窗口,也就是dos的多进程方法来同时执行同一个脚本的不同分片的数据,具体的做法是:新建一个batch.CMD的文件,详细利用start xxx命令来达到执行多个dos命令的目的;

batch.CMD

@echo
@D:
@cd D:/data
@start C:/ProgramData/Anaconda3/python.exe addressETL.py 0 499999
@start C:/ProgramData/Anaconda3/python.exe addressETL.py 500000 999999
@start C:/ProgramData/Anaconda3/python.exe addressETL.py 1000000 1499999
@start C:/ProgramData/Anaconda3/python.exe addressETL.py 1500000 1999999
@start C:/ProgramData/Anaconda3/python.exe addressETL.py 2000000 2499999
@start C:/ProgramData/Anaconda3/python.exe addressETL.py 2500000 2999999
@start C:/ProgramData/Anaconda3/python.exe addressETL.py 3000000 3499999
@start C:/ProgramData/Anaconda3/python.exe addressETL.py 3500000 3999999
@start C:/ProgramData/Anaconda3/python.exe addressETL.py 4000000 4499999
@start C:/ProgramData/Anaconda3/python.exe addressETL.py 4500000 4999999
@start C:/ProgramData/Anaconda3/python.exe addressETL.py 5000000 5499999
@start C:/ProgramData/Anaconda3/python.exe addressETL.py 5500000 5999999
@start C:/ProgramData/Anaconda3/python.exe addressETL.py 6000000 6499999
@start C:/ProgramData/Anaconda3/python.exe addressETL.py 6500000 6999999
@start C:/ProgramData/Anaconda3/python.exe addressETL.py 7000000 7499999
@start C:/ProgramData/Anaconda3/python.exe addressETL.py 7500000 7999999
exit

第三部分: multiprocess其他函数详解