原创. 禁转.
大批量请求发送需要考虑的几个因素:
1. 服务器承载能力(网络带宽/硬件配置);
2. 客户端IO情况, 客户端带宽, 硬件配置;
方案:
1. 方案都是相对的;
2. 因为这里我的情况是客户机只有一台,所以不能考虑使用分布式了, 服务器承载能力也非常有限(经过不断调试得知);
3. 这里没有使用Jmeter, 虽然jmeter也是可以做到的.
注: 如无特殊说明以下代码基于windows7 64位/Centos 6.5 64位, Python3.6+
Python里面支持发送大批量的方案有很多, 这里只介绍我所用过的几种:
1. 使用
grequests可以一次性发送超大批量的请求, 但是底层听说修改了socket通信, 可能不稳定或者不安全? 而且如果你需要校验对比每个请求的发送信息与返回信息, 比较不方便, 因为它是批量发送,然后批量收回, 示例代码 :
import grequestsimport timefrom collections import OrderedDictimport hashlibimport osimport xlrdimport jsonimport datetimeclass voiceSearchInterface: @classmethod def formatUrlAndHeader(self, des, singer, songName): #生成url和header的逻辑 return url, h @classmethod def exeRequests(self): errorLog = open("errorLog.txt","w+") startTime = datetime.datetime.now() rightCount = 0 errCount = 0 descr = ["播放", "搜索", "搜", "听", "我要听", "我想听", "来一首", "来一个", "来一段", "来一曲", "来首", "来个", "来段", "来曲"] orgPath = os.path.join(os.path.dirname(os.getcwd()), "test","SongsAndSingers","singersAndSongs3.txt") f = open(orgPath,"rb") i = 0 urlsAndHs = [] for line in f.readlines(): temp = line.decode().split("\t") orgSinger = temp[0] orgSong = temp[1].replace("\n","") for k in descr: urlAndH = self.formatUrlAndHeader(k, orgSinger,orgSong) urlsAndHs.append(urlAndH) f.close() rs = (grequests.get(u[0], headers = u[1], stream = False) for u in urlsAndHs) rsText = grequests.imap(rs, size=20) for r in rsText: executingLog = open("Log.txt","w+") i+=1 try: searchResult = json.loads(r.text) searchItem = searchResult["data"]["searchitem"] tt = searchItem.split("Split") searchSinger = tt[1] searchSong = tt[-1] resultSinger = searchResult["data"]["sounds"][0]["singer"] resultSong = searchResult["data"]["sounds"][0]["title"] if(searchSinger==resultSinger and searchSong==resultSong): rightCount += 1 else: errCount += 1 print(searchSinger, "\t",resultSinger, "\t",searchSong,"\t", resultSong) except Exception: errCount += 1 errorLog.write((r.text+"\n").encode('latin-1').decode('unicode_escape')) print(i) executingLog.write(str(int(i/14))) errorLog.close() executingLog.close() endTime = datetime.datetime.now() print("耗时: %d秒, 正确数: %d, 异常数: %d, 总数: %d, 通过率: %.2f%%" % ((endTime-startTime).seconds, rightCount, errCount, i, (rightCount)/i*100))voiceSearchInterface.exeRequests()
注意: 使用grequests可能有坑, 因为它修改了底层socket通信, 可能会造成系统有问题,我目前虽然还没遇到,但还是在这里友情提醒下.
2. 使用多进程+requests库:
Python里面的多进程库multiprocessing和requests库都是神器, 下面直接上代码:
#_*_coding=utf-8_*_import multiprocessingimport timefrom collections import OrderedDictimport hashlibimport linecacheimport osimport requestsimport jsondef formatUrlAndHeader(des, singer, songName): #生成url和header的逻辑 return url, h#每个进程都去读各自的文件,然后以写文件的方式保存当前的执行记录,为了预防断电或者其他程序异常终止情况def worker(fileName): descr = ["播放", "搜索", "搜", "听", "我要听", "我想听", "来一首", "来一个", "来一段", "来一曲", "来首", "来个", "来段", "来曲"] Logprefix = os.path.split(fileName)[1].replace(".txt", "") resultLogPath = os.path.join(os.getcwd(), "log", Logprefix+".log") logbreakPoint = os.path.join(os.getcwd(), "log", Logprefix+".txt") with open(logbreakPoint, "r") as b: startLine = int(b.read()) b.close() with open(resultLogPath, "a+", encoding="utf-8") as logF: with open(fileName, "r", encoding="utf-8") as f: lines = f.readlines() f.close() LineNum = startLine for j in range(len(lines)-startLine+1): LineContent = linecache.getline(fileName, LineNum) for i in descr: line = LineContent.split("\t") singer = line[0] song = line[1].replace("\n","") uAndH = formatUrlAndHeader(i, singer, song) try: r = requests.get(url=uAndH[0], headers = uAndH[1]) with open(logbreakPoint, "w") as w: w.write(str(LineNum)) print("searching:%s, line: %d\n" % (fileName, LineNum)) result = json.loads(r.text) resultSinger = result["data"]["sounds"][0]["singer"] resultSong = result["data"]["sounds"][0]["title"] if not (resultSinger==singer and resultSong==song): logF.write("Error: search des: %s, singer:%s, song:%s;return: %s\n" %(i,singer,song, r.text.encode('latin-1').decode('unicode_escape'))) except Exception as e: logF.write("Error: search des: %s, singer:%s, song:%s;return: %s\n" %(i,singer,song,str(e).encode('latin-1').decode('unicode_escape'))) LineNum += 1 logF.close()if __name__=='__main__': orgPath = os.path.join(os.getcwd(), "data") files = os.listdir(orgPath) for i in files: f =os.path.join(orgPath,i) if os.path.isfile(f): p = multiprocessing.Process(target=worker, args=(f,)) p.start()
程序会根据数据源文件数量, 生成相应的进程数. 每个进程各自读各自的数据源文件, 然后调用formatUrlAndHeader方法获取url和heade, 挨个发送请求并保存当前执行记录到指定文件. 这种方式的好处在于针对每个请求, 都能对比发送前的参数和收回的请求相应数据.
3. 使用异步asyncio, aiohttp
asyncio是python3.4+才进入的新东西, 是Python3.4+以上的标准库, 是推荐采用的方式, 而需要单独安装, 代码如下:
#_*_coding=utf-8_*_import aiohttpimport timefrom collections import OrderedDictimport hashlibimport asyncioimport osimport linecacheimport threadingdef formatUrlAndHeader(des, singer, songName): #生成url和header的逻辑 return url, h async def fetch_async(uandh): u, h = uandh[0],uandh[1] with aiohttp.Timeout(301): async with aiohttp.request('GET', url=u, headers=h) as r: data = await r.text() return data loop = asyncio.get_event_loop()descr = ["播放", "搜索", "搜", "听", "我要听", "我想听", "来一首", "来一个", "来一段", "来一曲", "来首", "来个", "来段", "来曲"]orgPath = os.path.join(os.path.dirname(os.getcwd()), "test","SongsAndSingers","singersAndSongs3.txt")def runRequests(startNum): start = time.time() urlsAndHs = [] for i in range(20): line = linecache.getline(orgPath, startNum+i).split("\t") orgSinger = line[0] orgSong = line[1].replace("\n","") for k in descr: urlAndH = formatUrlAndHeader(k, orgSinger,orgSong) urlsAndHs.append(urlAndH) linecache.clearcache() tasks = [fetch_async(uandh) for uandh in urlsAndHs] done, pending = loop.run_until_complete(asyncio.wait(tasks)) for i in done: print(i.result().encode('latin-1').decode('unicode_escape')) end = time.time() print(end-start) for i in range(1,50,20): t = threading.Thread(target=runRequests, args=(i,)) t.start() t.join()
一个源数据文件, 多线程. 每个线程根据传入的起始行号连续读取文件的20行, 然后批量发送20个请求, 下一个线程必须等待上一个线程结束才开始. 这种方式也是批量发, 批量收回,不能单独对比每个请求的请求前参数, 请求后相应.
以上3种方式, 任何一种都能满足我的测试要求. 实际过程中发现:
1. PHP接口对于单个请求, 参数pagesize对相应速度影响甚大, 具体原因未知;
2. 服务器对IO密集型的操作, 非常消耗CPU. 以上3种方式, 基本上都是每次只发20个请求左右, 而服务器的CPU(8核)已经满载了!