在实际中遇到这样一个问题,公司软件发布上线自动化。
说简单点,就是需要去登录一个上线的内部网站,然后爬下所有的上线数据。
然后根据爬下来的数据整理好,可以一起上线的,就并发多线程,其实就是去传参数点击一个链接等返回。
不能并发的就单线程点链接。
那这个事情必须更有效率,单线程的没问题,用 python 的 request 就可以实现了。
我们仔细研究一下协程,先讲一下历史:
使用Python的人往往纠结在多线程、多进程,哪个效率更高?到底用哪个好呢?
其实 Python 的多进程和多线程,相对于别家的协程和异步处理机制,都不行,线程之间切换耗费 CPU 和寄存器,OS 的调度不可控,多进程之间通讯也不便。性能根本不行。
后来呢 Python 改进了语法,出现了 yiled from 充当协程调度,有人就根据这个特性开发了第三方的协程框架,Tornado,Gevent等。
官方也不能坐视不理啊,任凭别人出风头,于是 Python 之父深入简出3年,苦心钻研自家的协程,async/await 和 asyncio 库,并放到 Python3.5 后成为官方原生的协程。
对于 http请求、读写文件、读写数据库这种高延时的 IO 操作,协程是个大杀器,优点非常多;它可以在预料到一个阻塞将发生时,挂起当前协程,跑去执行其它协程,同时把事件注册到循环中,实现了多协程并发,其实这玩意是跟 Nodejs 的回调学的。
看下图,详细解释下,左边我们有100个网页请求,并发100个协程请求(其实也是1个1个发),当需要等待长时间回应回应时,挂起当前协程,并注册一个回调函数到事件循环(Event Loop)中,执行下一个协程,当有协程事件完成再通过回调函数唤醒挂起的协程,然后返回结果。

这个跟 nodejs 的回调函数基本一样,我们必须注意主进程和协程的关系,如果我在一个主进程中,触发协程函数,有100个协程,那么必须等待100个协程都结束后,才能回到正常的那个主进程中。当然,主进程也可能也是一个协程。
那么协程的基本用法
async f(n) 声明一个函数是协程的
await f(n) 挂起当前协程,把控制权交回 event loop,并且执行f(n)和注册之后的f(n)回调。
举个例子:如果在 g() 这个函数中执行了 await f(),那么g()函数会被挂起,并等待 f() 函数有结果结束,然后返回 g() 继续执行。
async def get(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
最后一行 await 是挂起命令,挂起当前函数 get() ,并执行 response.text() 和注册回调,等待 response.text() 执行完成后重新激活当前函數get()继续执行,返回。
所以 await 只叫做挂起是不太对的,感觉应该叫做 挂起并注册回调 比较合适。
看以下程序,在 Python 3.7 之前,协程是这么用的:
import time
import asyncio
now = lambda : time.time()
async def do_some_work(x):
print('Waiting: ', x)
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('TIME: ', now() - start)
我们指定了一个协程 coroutine ,然后定义了一个事件循环 loop,loop 是需要 run_until_complete 所有的协程,然后交出控制权,返回正常的主进程。
跟上图完全匹配。
在 Python 3.7 之后,简化了用法,一句 asyncio.run 就可以了:
asyncio.run(do_some_work(2))
上面程序就变了,省了好多,但是副作用是第一次看到的人会不明白它是怎么进化过来的:
import time
import asyncio
now = lambda : time.time()
async def do_some_work(x):
print('Waiting: ', x)
start = now()
asyncio.run(do_some_work(2))
print('TIME: ', now() - start)
如果我们要访问一个网站的100个网页,单线程的做法是:请求一次,回来一次,然后进行下一个
for url in urls:
response=get(url)
results=parse(response)
这样效率很低,协程呢,做法就不同了,一次发起100个请求(准确的说也是一个一个发),不同的是协程不会死等返回,而是发一个请求,挂起,再发一个再挂起,发起100个,就挂起100个,然后注册并等待100个返回,效率提升了100倍。可以理解为同时做了100件事,做到由自己调度而不是交给CPU,程序的并发由自己来控制,而不是交由 OS 去调度,效率极大的提高了。
进化到协程,我们把费 IO 的 get 函数抽出来放到协程里:
async def get(url:str):
my_conn = aiohttp.TCPConnector(limit=10)
async with aiohttp.ClientSession(connector=my_conn) as session:
async with session.get(url) as resp:
return await resp.text()
for url in urls:
response=asyncio.run(get(url))
results=parse(response)
具体到我们的项目,我们首先要登录一个网页拿到 cookie,这个过程其实就一个协程,没人会登录个几百次吧。然后把放了 cookie 的 session 取出来,供后面的协程再复用就可以了,示例代码如下:
import aiohttp
import asyncio
async def login():
my_conn = aiohttp.TCPConnector(limit=10)
async with aiohttp.ClientSession(connector=my_conn) as session:
data = {'loginname':'wangbadan','password':'Fuckyouall'}
async with session.post('http://192.168.1.3/user/login',data=data) as resp:
print(resp.url)
print(resp.status)
print(await resp.text())
return session
session = asyncio.run(login())
print(f"{session}")
再给一个完全版的主函数是进程,下载是协程的例子,注意里面的 aiohttp.TCPConnector(limit=10),限制一下并发是10个,否则会被服务器 Ban 掉:
import asyncio
import time
import aiohttp
from aiohttp.client import ClientSession
async def download_link(url:str,session:ClientSession):
async with session.get(url) as response:
result = await response.text()
print(f'Read {len(result)} from {url}')
async def download_all(urls:list):
my_conn = aiohttp.TCPConnector(limit=10)
async with aiohttp.ClientSession(connector=my_conn) as session:
tasks = []
for url in urls:
task = asyncio.ensure_future(download_link(url=url,session=session))
tasks.append(task)
await asyncio.gather(*tasks,return_exceptions=True) # the await must be nest inside of the session
url_list = ["https://www.google.com","https://www.bing.com"]*50
print(url_list)
start = time.time()
asyncio.run(download_all(url_list))
end = time.time()
print(f'download {len(url_list)} links in {end - start} seconds')
协程里的 session 也有很多种用法,参考下面的链接就好:
https://blog.csdn.net/weixin_39643613/article/details/109171090
我们也给出简单易用的线程池版,说不定以后会用上:

import requests
from requests.sessions import Session
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Thread,local
url_list = ["https://www.google.com/","https://www.bing.com"]*50
thread_local = local()
def get_session() -> Session:
if not hasattr(thread_local,'session'):
thread_local.session = requests.Session()
return thread_local.session
def download_link(url:str):
session = get_session()
with session.get(url) as response:
print(f'Read {len(response.content)} from {url}')
def download_all(urls:list) -> None:
with ThreadPoolExecutor(max_workers=10) as executor:
executor.map(download_link,url_list)
start = time.time()
download_all(url_list)
end = time.time()
print(f'download {len(url_list)} links in {end - start} seconds')