博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
用Python做大批量请求发送
阅读量:6358 次
发布时间:2019-06-23

本文共 7404 字,大约阅读时间需要 24 分钟。

原创. 禁转.

大批量请求发送需要考虑的几个因素:

1. 服务器承载能力(网络带宽/硬件配置);

2. 客户端IO情况, 客户端带宽, 硬件配置;

 

方案:

1. 方案都是相对的;

2. 因为这里我的情况是客户机只有一台,所以不能考虑使用分布式了, 服务器承载能力也非常有限(经过不断调试得知);

3. 这里没有使用Jmeter, 虽然jmeter也是可以做到的.  

 

注: 如无特殊说明以下代码基于windows7 64位/Centos 6.5 64位, Python3.6+

Python里面支持发送大批量的方案有很多, 这里只介绍我所用过的几种:

1. 使用

grequests可以一次性发送超大批量的请求, 但是底层听说修改了socket通信, 可能不稳定或者不安全? 而且如果你需要校验对比每个请求的发送信息与返回信息, 比较不方便, 因为它是批量发送,然后批量收回, 示例代码 :

 

import grequestsimport timefrom collections import OrderedDictimport hashlibimport osimport xlrdimport jsonimport datetimeclass voiceSearchInterface:    @classmethod    def formatUrlAndHeader(self, des, singer, songName):        #生成url和header的逻辑        return url, h        @classmethod    def exeRequests(self):        errorLog = open("errorLog.txt","w+")        startTime = datetime.datetime.now()        rightCount = 0        errCount = 0        descr = ["播放", "搜索", "搜", "听", "我要听", "我想听", "来一首", "来一个", "来一段", "来一曲", "来首", "来个", "来段", "来曲"]        orgPath = os.path.join(os.path.dirname(os.getcwd()), "test","SongsAndSingers","singersAndSongs3.txt")        f = open(orgPath,"rb")        i = 0        urlsAndHs = []        for line in f.readlines():            temp = line.decode().split("\t")            orgSinger = temp[0]            orgSong = temp[1].replace("\n","")            for k in descr:                urlAndH = self.formatUrlAndHeader(k, orgSinger,orgSong)                urlsAndHs.append(urlAndH)        f.close()        rs = (grequests.get(u[0], headers = u[1], stream = False) for u in urlsAndHs)        rsText = grequests.imap(rs, size=20)        for r in rsText:            executingLog = open("Log.txt","w+")             i+=1            try:                   searchResult = json.loads(r.text)                searchItem = searchResult["data"]["searchitem"]                tt =  searchItem.split("Split")                searchSinger = tt[1]                searchSong = tt[-1]                resultSinger = searchResult["data"]["sounds"][0]["singer"]                resultSong = searchResult["data"]["sounds"][0]["title"]                if(searchSinger==resultSinger and searchSong==resultSong):                                        rightCount += 1                else:                                        errCount += 1                    print(searchSinger, "\t",resultSinger, "\t",searchSong,"\t", resultSong)            except Exception:                errCount += 1                errorLog.write((r.text+"\n").encode('latin-1').decode('unicode_escape'))            print(i)            executingLog.write(str(int(i/14)))        errorLog.close()        executingLog.close()        endTime = datetime.datetime.now()        print("耗时: %d秒, 正确数: %d, 异常数: %d, 总数: %d, 通过率: %.2f%%" % ((endTime-startTime).seconds, rightCount, errCount,  i, (rightCount)/i*100))voiceSearchInterface.exeRequests()

 注意: 使用grequests可能有坑, 因为它修改了底层socket通信, 可能会造成系统有问题,我目前虽然还没遇到,但还是在这里友情提醒下.

 

2. 使用多进程+requests库:

Python里面的多进程库multiprocessing和requests库都是神器, 下面直接上代码:

#_*_coding=utf-8_*_import multiprocessingimport timefrom collections import OrderedDictimport hashlibimport linecacheimport osimport requestsimport jsondef formatUrlAndHeader(des, singer, songName):    #生成url和header的逻辑    return url, h#每个进程都去读各自的文件,然后以写文件的方式保存当前的执行记录,为了预防断电或者其他程序异常终止情况def worker(fileName):      descr = ["播放", "搜索", "搜", "听", "我要听", "我想听", "来一首", "来一个", "来一段", "来一曲", "来首", "来个", "来段", "来曲"]    Logprefix = os.path.split(fileName)[1].replace(".txt", "")    resultLogPath = os.path.join(os.getcwd(), "log", Logprefix+".log")    logbreakPoint = os.path.join(os.getcwd(), "log", Logprefix+".txt")    with open(logbreakPoint, "r") as b:        startLine = int(b.read())        b.close()    with open(resultLogPath, "a+", encoding="utf-8") as logF:        with open(fileName, "r", encoding="utf-8") as f:            lines = f.readlines()            f.close()            LineNum = startLine            for j in range(len(lines)-startLine+1):                LineContent = linecache.getline(fileName, LineNum)                for i in descr:                    line = LineContent.split("\t")                    singer = line[0]                    song = line[1].replace("\n","")                    uAndH = formatUrlAndHeader(i, singer, song)                    try:                        r = requests.get(url=uAndH[0], headers = uAndH[1])                        with open(logbreakPoint, "w") as w:                            w.write(str(LineNum))                        print("searching:%s, line: %d\n" % (fileName, LineNum))                        result = json.loads(r.text)                        resultSinger = result["data"]["sounds"][0]["singer"]                        resultSong = result["data"]["sounds"][0]["title"]                        if not (resultSinger==singer and resultSong==song):                            logF.write("Error: search des: %s, singer:%s, song:%s;return: %s\n" %(i,singer,song, r.text.encode('latin-1').decode('unicode_escape')))                    except Exception as e:                        logF.write("Error: search des: %s, singer:%s, song:%s;return: %s\n" %(i,singer,song,str(e).encode('latin-1').decode('unicode_escape')))                LineNum += 1        logF.close()if __name__=='__main__':    orgPath = os.path.join(os.getcwd(), "data")    files = os.listdir(orgPath)    for i in files:        f =os.path.join(orgPath,i)        if os.path.isfile(f):            p = multiprocessing.Process(target=worker, args=(f,))            p.start()

程序会根据数据源文件数量, 生成相应的进程数. 每个进程各自读各自的数据源文件, 然后调用formatUrlAndHeader方法获取url和heade, 挨个发送请求并保存当前执行记录到指定文件. 这种方式的好处在于针对每个请求, 都能对比发送前的参数和收回的请求相应数据.

 

3. 使用异步asyncio, aiohttp

asyncio是python3.4+才进入的新东西, 是Python3.4+以上的标准库, 是推荐采用的方式, 而需要单独安装, 代码如下:

#_*_coding=utf-8_*_import aiohttpimport timefrom collections import OrderedDictimport hashlibimport asyncioimport osimport linecacheimport threadingdef formatUrlAndHeader(des, singer, songName):    #生成url和header的逻辑    return url, h    async def fetch_async(uandh):    u, h = uandh[0],uandh[1]    with aiohttp.Timeout(301):        async with aiohttp.request('GET', url=u, headers=h) as r:            data = await r.text()            return data     loop = asyncio.get_event_loop()descr = ["播放", "搜索", "搜", "听", "我要听", "我想听", "来一首", "来一个", "来一段", "来一曲", "来首", "来个", "来段", "来曲"]orgPath = os.path.join(os.path.dirname(os.getcwd()), "test","SongsAndSingers","singersAndSongs3.txt")def runRequests(startNum):    start = time.time()    urlsAndHs = []    for i in range(20):        line = linecache.getline(orgPath, startNum+i).split("\t")        orgSinger = line[0]        orgSong = line[1].replace("\n","")        for k in descr:            urlAndH = formatUrlAndHeader(k, orgSinger,orgSong)            urlsAndHs.append(urlAndH)    linecache.clearcache()    tasks = [fetch_async(uandh) for uandh in urlsAndHs]    done, pending = loop.run_until_complete(asyncio.wait(tasks))    for i in done:        print(i.result().encode('latin-1').decode('unicode_escape'))    end = time.time()    print(end-start)   for i in range(1,50,20):    t = threading.Thread(target=runRequests, args=(i,))    t.start()    t.join()

一个源数据文件, 多线程. 每个线程根据传入的起始行号连续读取文件的20行, 然后批量发送20个请求, 下一个线程必须等待上一个线程结束才开始. 这种方式也是批量发, 批量收回,不能单独对比每个请求的请求前参数, 请求后相应.

 

以上3种方式, 任何一种都能满足我的测试要求. 实际过程中发现:

1. PHP接口对于单个请求, 参数pagesize对相应速度影响甚大, 具体原因未知; 

2. 服务器对IO密集型的操作, 非常消耗CPU. 以上3种方式, 基本上都是每次只发20个请求左右, 而服务器的CPU(8核)已经满载了!

转载于:https://www.cnblogs.com/onlyfreedom/p/7323708.html

你可能感兴趣的文章
SKF密码设备研究
查看>>
数据对象映射模式(通过工厂模式和注册树模式)v2
查看>>
4939 欧拉函数[一中数论随堂练]
查看>>
MySQL笔记(一)
查看>>
spring boot 包jar运行
查看>>
18年秋季学习总结
查看>>
Effective前端1:能使用html/css解决的问题就不要使用JS
查看>>
网络攻防 实验一
查看>>
由莫名其妙的错误开始---浅谈jquery的dom节点创建
查看>>
磨刀-CodeWarrior11生成的Makefile解析
查看>>
String StringBuffer StringBuilder对比
查看>>
bootstrap随笔点击增加
查看>>
oracle 中proc和oci操作对缓存不同处理
查看>>
[LeetCode] Spiral Matrix 解题报告
查看>>
60906磁悬浮动力系统应用研究与模型搭建
查看>>
指纹获取 Fingerprint2
查看>>
面试题目3:智能指针
查看>>
flask ORM: Flask-SQLAlchemy【单表】增删改查
查看>>
vim 常用指令
查看>>
nodejs 获取自己的ip
查看>>